diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 63a0f3bb7..d37258857 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -10,6 +10,7 @@ from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \ ACLResourcePatternType +from kafka.admin.leader_election_resources import ElectionType from kafka.client_async import KafkaClient, selectors from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol import kafka.errors as Errors @@ -20,7 +21,7 @@ from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest, - DeleteGroupsRequest + DeleteGroupsRequest, ElectLeadersRequest ) from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest @@ -393,27 +394,55 @@ def _send_request_to_controller(self, request): # So this is a little brittle in that it assumes all responses have # one of these attributes and that they always unpack into # (topic, error_code) tuples. - topic_error_tuples = (response.topic_errors if hasattr(response, 'topic_errors') - else response.topic_error_codes) - # Also small py2/py3 compatibility -- py3 can ignore extra values - # during unpack via: for x, y, *rest in list_of_values. py2 cannot. - # So for now we have to map across the list and explicitly drop any - # extra values (usually the error_message) - for topic, error_code in map(lambda e: e[:2], topic_error_tuples): + topic_error_tuples = getattr(response, 'topic_errors', getattr(response, 'topic_error_codes', None)) + if topic_error_tuples is not None: + success = self._parse_topic_request_response(topic_error_tuples, request, response, tries) + else: + # Leader Election request has a two layer error response (topic and partition) + success = self._parse_topic_partition_request_response(request, response, tries) + + if success: + return response + raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered") + + def _parse_topic_request_response(self, topic_error_tuples, request, response, tries): + # Also small py2/py3 compatibility -- py3 can ignore extra values + # during unpack via: for x, y, *rest in list_of_values. py2 cannot. + # So for now we have to map across the list and explicitly drop any + # extra values (usually the error_message) + for topic, error_code in map(lambda e: e[:2], topic_error_tuples): + error_type = Errors.for_code(error_code) + if tries and error_type is NotControllerError: + # No need to inspect the rest of the errors for + # non-retriable errors because NotControllerError should + # either be thrown for all errors or no errors. + self._refresh_controller_id() + return False + elif error_type is not Errors.NoError: + raise error_type( + "Request '{}' failed with response '{}'." + .format(request, response)) + return True + + def _parse_topic_partition_request_response(self, request, response, tries): + # Also small py2/py3 compatibility -- py3 can ignore extra values + # during unpack via: for x, y, *rest in list_of_values. py2 cannot. + # So for now we have to map across the list and explicitly drop any + # extra values (usually the error_message) + for topic, partition_results in response.replication_election_results: + for partition_id, error_code in map(lambda e: e[:2], partition_results): error_type = Errors.for_code(error_code) if tries and error_type is NotControllerError: # No need to inspect the rest of the errors for # non-retriable errors because NotControllerError should # either be thrown for all errors or no errors. self._refresh_controller_id() - break - elif error_type is not Errors.NoError: + return False + elif error_type not in [Errors.NoError, Errors.ElectionNotNeeded]: raise error_type( "Request '{}' failed with response '{}'." .format(request, response)) - else: - return response - raise RuntimeError("This should never happen, please file a bug with full stacktrace if encountered") + return True @staticmethod def _convert_new_topic_request(new_topic): @@ -1337,10 +1366,60 @@ def _delete_consumer_groups_send_request(self, group_ids, group_coordinator_id): .format(version)) return self._send_request_to_node(group_coordinator_id, request) + @staticmethod + def _convert_topic_partitions(topic_partitions): + return [ + ( + topic, + partition_ids + ) + for topic, partition_ids in topic_partitions.items() + ] + + def _get_all_topic_partitions(self): + return [ + ( + topic, + [partition_info.partition for partition_info in self._client.cluster._partitions[topic].values()] + ) + for topic in self._client.cluster.topics() + ] + + def _get_topic_partitions(self, topic_partitions): + if topic_partitions is None: + return self._get_all_topic_partitions() + return self._convert_topic_partitions(topic_partitions) + + def perform_leader_election(self, election_type, topic_partitions=None, timeout_ms=None): + """Perform leader election on the topic partitions. + + :param election_type: Type of election to attempt. 0 for Perferred, 1 for Unclean + :param topic_partitions: A map of topic name strings to partition ids list. + By default, will run on all topic partitions + :param timeout_ms: Milliseconds to wait for the leader election process to complete + before the broker returns. + + :return: Appropriate version of CreateTopicResponse class. + """ + version = self._matching_api_version(ElectLeadersRequest) + timeout_ms = self._validate_timeout(timeout_ms) + if 0 < version <= 1: + request = ElectLeadersRequest[version]( + election_type=ElectionType(election_type), + topic_partitions=self._get_topic_partitions(topic_partitions), + timeout=timeout_ms, + ) + else: + raise NotImplementedError( + "Support for CreateTopics v{} has not yet been added to KafkaAdminClient." + .format(version)) + # TODO convert structs to a more pythonic interface + return self._send_request_to_controller(request) + def _wait_for_futures(self, futures): while not all(future.succeeded() for future in futures): for future in futures: self._client.poll(future=future) if future.failed(): - raise future.exception # pylint: disable-msg=raising-bad-type + raise future.exception # pylint: disable-msg=raising-bad-type \ No newline at end of file diff --git a/kafka/admin/leader_election_resources.py b/kafka/admin/leader_election_resources.py new file mode 100644 index 000000000..bbdd43239 --- /dev/null +++ b/kafka/admin/leader_election_resources.py @@ -0,0 +1,15 @@ +from __future__ import absolute_import + +# enum in stdlib as of py3.4 +try: + from enum import IntEnum # pylint: disable=import-error +except ImportError: + # vendored backport module + from kafka.vendor.enum34 import IntEnum + +class ElectionType(IntEnum): + """ Leader election type + """ + + PREFERRED = 0, + UNCLEAN = 1 \ No newline at end of file diff --git a/kafka/errors.py b/kafka/errors.py index b33cf51e2..6210b0f2e 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -461,6 +461,12 @@ class GroupIdNotFoundError(BrokerResponseError): description = 'The group id does not exist.' +class ElectionNotNeeded(BrokerResponseError): + errno = 84 + message = 'ELECTION_NOT_NEEDED' + description = 'Leader election not needed for topic partition.' + + class KafkaUnavailableError(KafkaError): pass diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index f9d61e5cd..d94c2b540 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -1052,3 +1052,68 @@ class ListPartitionReassignmentsRequest_v0(Request): ListPartitionReassignmentsRequest = [ListPartitionReassignmentsRequest_v0] ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0] + + +class ElectLeadersResponse_v0(Response): + API_KEY = 43 + API_VERSION = 1 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('replication_election_results', Array( + ('topic', String('utf-8')), + ('partition_result', Array( + ('partition_id', Int32), + ('error_code', Int16), + ('error_message', String('utf-8')) + )) + )) + ) + +class ElectLeadersRequest_v0(Request): + API_KEY = 43 + API_VERSION = 1 + RESPONSE_TYPE = ElectLeadersResponse_v0 + SCHEMA = Schema( + ('election_type', Int8), + ('topic_partitions', Array( + ('topic', String('utf-8')), + ('partition_ids', Array(Int32)) + )), + ('timeout', Int32), + ) + + +class ElectLeadersResponse_v1(Response): + API_KEY = 43 + API_VERSION = 1 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('replication_election_results', Array( + ('topic', String('utf-8')), + ('partition_result', Array( + ('partition_id', Int32), + ('error_code', Int16), + ('error_message', String('utf-8')) + )) + )) + ) + +class ElectLeadersRequest_v1(Request): + API_KEY = 43 + API_VERSION = 1 + RESPONSE_TYPE = ElectLeadersResponse_v1 + SCHEMA = Schema( + ('election_type', Int8), + ('topic_partitions', Array( + ('topic', String('utf-8')), + ('partition_ids', Array(Int32)) + )), + ('timeout', Int32), + ) + + +ElectLeadersRequest = [ElectLeadersRequest_v0, ElectLeadersRequest_v1] + +ElectLeadersResponse = [ElectLeadersResponse_v0, ElectLeadersResponse_v1]