Skip to content

Commit

Permalink
KIP-345 Static membership implementation (#137)
Browse files Browse the repository at this point in the history
* KIP-345 Add static consumer membership support

* KIP-345 Add examples to docs

* KIP-345 Add leave_group_on_close flag

https://issues.apache.org/jira/browse/KAFKA-6995

* KIP-345 Add tests for static membership

* KIP-345 Update docs for leave_group_on_close option

* Update changelog.rst

* remove six from base.py

* Update base.py

* Update base.py

* Update base.py

* Update changelog.rst

* Update README.rst

---------

Co-authored-by: Denis Kazakov <d.kazakov@mcplat.ru>
Co-authored-by: Denis Kazakov <denis@kazakov.ru.net>
  • Loading branch information
3 people authored Mar 20, 2024
1 parent 0259502 commit 3c124b2
Show file tree
Hide file tree
Showing 10 changed files with 302 additions and 41 deletions.
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# 2.0.3 (under development)

Consumer
* KIP-345: Implement static membership support

# 2.0.2 (Sep 29, 2020)

Consumer
Expand Down
6 changes: 5 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,12 @@ that expose basic message attributes: topic, partition, offset, key, and value:

.. code-block:: python
# join a consumer group for dynamic partition assignment and offset commits
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group')
# or as a static member with a fixed group member name
# consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group',
# group_instance_id='consumer-1', leave_group_on_close=False)
for msg in consumer:
print (msg)
Expand Down
7 changes: 7 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Changelog
=========

2.2.0
####################

Consumer
--------
* KIP-345: Implement static membership support


2.0.2 (Sep 29, 2020)
####################
Expand Down
12 changes: 12 additions & 0 deletions docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ KafkaConsumer
group_id='my-group',
bootstrap_servers='my.server.com')
# Use multiple static consumers w/ 2.3.0 kafka brokers
consumer1 = KafkaConsumer('my-topic',
group_id='my-group',
group_instance_id='process-1',
leave_group_on_close=False,
bootstrap_servers='my.server.com')
consumer2 = KafkaConsumer('my-topic',
group_id='my-group',
group_instance_id='process-2',
leave_group_on_close=False,
bootstrap_servers='my.server.com')
There are many configuration options for the consumer class. See
:class:`~kafka.KafkaConsumer` API documentation for more details.
Expand Down
12 changes: 11 additions & 1 deletion kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ class KafkaConsumer:
committing offsets. If None, auto-partition assignment (via
group coordinator) and offset commits are disabled.
Default: None
group_instance_id (str): the unique identifier to distinguish
each client instance. If set and leave_group_on_close is
False consumer group rebalancing won't be triggered until
sessiont_timeout_ms is met. Requires 2.3.0+.
leave_group_on_close (bool or None): whether to leave a consumer
group or not on consumer shutdown.
key_deserializer (callable): Any callable that takes a
raw message key and returns a deserialized key.
value_deserializer (callable): Any callable that takes a
Expand Down Expand Up @@ -241,6 +247,7 @@ class KafkaConsumer:
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
instance. (See kafka.oauth.abstract). Default: None
kafka_client (callable): Custom class / callable for creating KafkaClient instances
coordinator (callable): Custom class / callable for creating ConsumerCoordinator instances
Note:
Configuration parameters are described in more detail at
Expand All @@ -250,6 +257,8 @@ class KafkaConsumer:
'bootstrap_servers': 'localhost',
'client_id': 'kafka-python-' + __version__,
'group_id': None,
'group_instance_id': '',
'leave_group_on_close': None,
'key_deserializer': None,
'value_deserializer': None,
'fetch_max_wait_ms': 500,
Expand Down Expand Up @@ -304,6 +313,7 @@ class KafkaConsumer:
'sasl_oauth_token_provider': None,
'legacy_iterator': False, # enable to revert to < 1.4.7 iterator
'kafka_client': KafkaClient,
'coordinator': ConsumerCoordinator,
}
DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000

Expand Down Expand Up @@ -379,7 +389,7 @@ def __init__(self, *topics, **configs):
self._subscription = SubscriptionState(self.config['auto_offset_reset'])
self._fetcher = Fetcher(
self._client, self._subscription, self._metrics, **self.config)
self._coordinator = ConsumerCoordinator(
self._coordinator = self.config['coordinator'](
self._client, self._subscription, self._metrics,
assignors=self.config['partition_assignment_strategy'],
**self.config)
Expand Down
140 changes: 111 additions & 29 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ class BaseCoordinator:

DEFAULT_CONFIG = {
'group_id': 'kafka-python-default-group',
'group_instance_id': '',
'leave_group_on_close': None,
'session_timeout_ms': 10000,
'heartbeat_interval_ms': 3000,
'max_poll_interval_ms': 300000,
Expand All @@ -92,6 +94,12 @@ def __init__(self, client, metrics, **configs):
group_id (str): name of the consumer group to join for dynamic
partition assignment (if enabled), and to use for fetching and
committing offsets. Default: 'kafka-python-default-group'
group_instance_id (str): the unique identifier to distinguish
each client instance. If set and leave_group_on_close is
False consumer group rebalancing won't be triggered until
sessiont_timeout_ms is met. Requires 2.3.0+.
leave_group_on_close (bool or None): whether to leave a consumer
group or not on consumer shutdown.
session_timeout_ms (int): The timeout used to detect failures when
using Kafka's group management facilities. Default: 30000
heartbeat_interval_ms (int): The expected time in milliseconds
Expand All @@ -117,6 +125,11 @@ def __init__(self, client, metrics, **configs):
"different values for max_poll_interval_ms "
"and session_timeout_ms")

if self.config['group_instance_id'] and self.config['api_version'] < (2, 3, 0):
raise Errors.KafkaConfigurationError(
'Broker version %s does not support static membership' % (self.config['api_version'],),
)

self._client = client
self.group_id = self.config['group_id']
self.heartbeat = Heartbeat(**self.config)
Expand Down Expand Up @@ -451,30 +464,48 @@ def _send_join_group_request(self):
if self.config['api_version'] < (0, 9):
raise Errors.KafkaError('JoinGroupRequest api requires 0.9+ brokers')
elif (0, 9) <= self.config['api_version'] < (0, 10, 1):
request = JoinGroupRequest[0](
version = 0
args = (
self.group_id,
self.config['session_timeout_ms'],
self._generation.member_id,
self.protocol_type(),
member_metadata)
member_metadata,
)
elif (0, 10, 1) <= self.config['api_version'] < (0, 11, 0):
request = JoinGroupRequest[1](
version = 1
args = (
self.group_id,
self.config['session_timeout_ms'],
self.config['max_poll_interval_ms'],
self._generation.member_id,
self.protocol_type(),
member_metadata)
member_metadata,
)
elif self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']:
version = 5
args = (
self.group_id,
self.config['session_timeout_ms'],
self.config['max_poll_interval_ms'],
self._generation.member_id,
self.config['group_instance_id'],
self.protocol_type(),
member_metadata,
)
else:
request = JoinGroupRequest[2](
version = 2
args = (
self.group_id,
self.config['session_timeout_ms'],
self.config['max_poll_interval_ms'],
self._generation.member_id,
self.protocol_type(),
member_metadata)
member_metadata,
)

# create the request for the coordinator
request = JoinGroupRequest[version](*args)
log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id)
future = Future()
_f = self._client.send(self.coordinator_id, request)
Expand Down Expand Up @@ -558,12 +589,25 @@ def _handle_join_group_response(self, future, send_time, response):

def _on_join_follower(self):
# send follower's sync group with an empty assignment
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
request = SyncGroupRequest[version](
self.group_id,
self._generation.generation_id,
self._generation.member_id,
{})
if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']:
version = 3
args = (
self.group_id,
self._generation.generation_id,
self._generation.member_id,
self.config['group_instance_id'],
{},
)
else:
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
args = (
self.group_id,
self._generation.generation_id,
self._generation.member_id,
{},
)

request = SyncGroupRequest[version](*args)
log.debug("Sending follower SyncGroup for group %s to coordinator %s: %s",
self.group_id, self.coordinator_id, request)
return self._send_sync_group_request(request)
Expand All @@ -586,15 +630,30 @@ def _on_join_leader(self, response):
except Exception as e:
return Future().failure(e)

version = 0 if self.config['api_version'] < (0, 11, 0) else 1
request = SyncGroupRequest[version](
self.group_id,
self._generation.generation_id,
self._generation.member_id,
[(member_id,
assignment if isinstance(assignment, bytes) else assignment.encode())
for member_id, assignment in group_assignment.items()])
group_assignment = [
(member_id, assignment if isinstance(assignment, bytes) else assignment.encode())
for member_id, assignment in group_assignment.items()
]

if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']:
version = 3
args = (
self.group_id,
self._generation.generation_id,
self._generation.member_id,
self.config['group_instance_id'],
group_assignment,
)
else:
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
args = (
self.group_id,
self._generation.generation_id,
self._generation.member_id,
group_assignment,
)

request = SyncGroupRequest[version](*args)
log.debug("Sending leader SyncGroup for group %s to coordinator %s: %s",
self.group_id, self.coordinator_id, request)
return self._send_sync_group_request(request)
Expand Down Expand Up @@ -760,15 +819,22 @@ def close(self):
def maybe_leave_group(self):
"""Leave the current group and reset local generation/memberId."""
with self._client._lock, self._lock:
if (not self.coordinator_unknown()
if (
not self.coordinator_unknown()
and self.state is not MemberState.UNJOINED
and self._generation is not Generation.NO_GENERATION):

and self._generation is not Generation.NO_GENERATION
and self._leave_group_on_close()
):
# this is a minimal effort attempt to leave the group. we do not
# attempt any resending if the request fails or times out.
log.info('Leaving consumer group (%s).', self.group_id)
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
request = LeaveGroupRequest[version](self.group_id, self._generation.member_id)
if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']:
version = 3
args = (self.group_id, [(self._generation.member_id, self.config['group_instance_id'])])
else:
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
args = self.group_id, self._generation.member_id
request = LeaveGroupRequest[version](*args)
future = self._client.send(self.coordinator_id, request)
future.add_callback(self._handle_leave_group_response)
future.add_errback(log.error, "LeaveGroup request failed: %s")
Expand All @@ -795,10 +861,23 @@ def _send_heartbeat_request(self):
e = Errors.NodeNotReadyError(self.coordinator_id)
return Future().failure(e)

version = 0 if self.config['api_version'] < (0, 11, 0) else 1
request = HeartbeatRequest[version](self.group_id,
self._generation.generation_id,
self._generation.member_id)
if self.config['api_version'] >= (2, 3, 0) and self.config['group_instance_id']:
version = 2
args = (
self.group_id,
self._generation.generation_id,
self._generation.member_id,
self.config['group_instance_id'],
)
else:
version = 0 if self.config['api_version'] < (0, 11, 0) else 1
args = (
self.group_id,
self._generation.generation_id,
self._generation.member_id,
)

request = HeartbeatRequest[version](*args)
log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member
future = Future()
_f = self._client.send(self.coordinator_id, request)
Expand Down Expand Up @@ -845,6 +924,9 @@ def _handle_heartbeat_response(self, future, send_time, response):
log.error("Heartbeat failed: Unhandled error: %s", error)
future.failure(error)

def _leave_group_on_close(self):
return self.config['leave_group_on_close'] is None or self.config['leave_group_on_close']


class GroupCoordinatorMetrics:
def __init__(self, heartbeat, metrics, prefix, tags=None):
Expand Down
17 changes: 15 additions & 2 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class ConsumerCoordinator(BaseCoordinator):
"""This class manages the coordination process with the consumer coordinator."""
DEFAULT_CONFIG = {
'group_id': 'kafka-python-default-group',
'group_instance_id': '',
'leave_group_on_close': None,
'enable_auto_commit': True,
'auto_commit_interval_ms': 5000,
'default_offset_commit_callback': None,
Expand All @@ -45,6 +47,12 @@ def __init__(self, client, subscription, metrics, **configs):
group_id (str): name of the consumer group to join for dynamic
partition assignment (if enabled), and to use for fetching and
committing offsets. Default: 'kafka-python-default-group'
group_instance_id (str): the unique identifier to distinguish
each client instance. If set and leave_group_on_close is
False consumer group rebalancing won't be triggered until
sessiont_timeout_ms is met. Requires 2.3.0+.
leave_group_on_close (bool or None): whether to leave a consumer
group or not on consumer shutdown.
enable_auto_commit (bool): If true the consumer's offset will be
periodically committed in the background. Default: True.
auto_commit_interval_ms (int): milliseconds between automatic
Expand Down Expand Up @@ -304,10 +312,15 @@ def _perform_assignment(self, leader_id, assignment_strategy, members):
assert assignor, f'Invalid assignment protocol: {assignment_strategy}'
member_metadata = {}
all_subscribed_topics = set()
for member_id, metadata_bytes in members:

for member in members:
if len(member) == 3:
member_id, group_instance_id, metadata_bytes = member
else:
member_id, metadata_bytes = member
metadata = ConsumerProtocol.METADATA.decode(metadata_bytes)
member_metadata[member_id] = metadata
all_subscribed_topics.update(metadata.subscription) # pylint: disable-msg=no-member
all_subscribed_topics.update(metadata.subscription) # pylint: disable-msg=no-member

# the leader will begin watching for changes to any of the topics
# the group is interested in, which ensures that all metadata changes
Expand Down
Loading

0 comments on commit 3c124b2

Please sign in to comment.