Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Config parameter 'coordinator_not_ready_retry_timeout_ms' #166

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ class KafkaConsumer(six.Iterator):
metrics. Default: 2
metrics_sample_window_ms (int): The maximum age in milliseconds of
samples used to compute metrics. Default: 30000
coordinator_not_ready_retry_timeout_ms (int): The timeout used to detect
that the Kafka coordinator is not available. If 'None', the default
behavior of polling indefinitely would be kept. Default: None
selector (selectors.BaseSelector): Provide a specific selector
implementation to use for I/O multiplexing.
Default: selectors.DefaultSelector
Expand Down Expand Up @@ -288,6 +291,7 @@ class KafkaConsumer(six.Iterator):
'metrics_num_samples': 2,
'metrics_sample_window_ms': 30000,
'metric_group_prefix': 'consumer',
'coordinator_not_ready_retry_timeout_ms': None,
'selector': selectors.DefaultSelector,
'exclude_internal_topics': True,
'sasl_mechanism': None,
Expand Down
21 changes: 18 additions & 3 deletions kafka/coordinator/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class BaseCoordinator(object):
'group_id': 'kafka-python-default-group',
'session_timeout_ms': 10000,
'heartbeat_interval_ms': 3000,
'coordinator_not_ready_retry_timeout_ms': None,
'max_poll_interval_ms': 300000,
'retry_backoff_ms': 100,
'api_version': (0, 10, 1),
Expand All @@ -98,7 +99,7 @@ def __init__(self, client, metrics, **configs):
partition assignment (if enabled), and to use for fetching and
committing offsets. Default: 'kafka-python-default-group'
session_timeout_ms (int): The timeout used to detect failures when
using Kafka's group managementment facilities. Default: 30000
using Kafka's group management facilities. Default: 30000
heartbeat_interval_ms (int): The expected time in milliseconds
between heartbeats to the consumer coordinator when using
Kafka's group management feature. Heartbeats are used to ensure
Expand All @@ -108,6 +109,10 @@ def __init__(self, client, metrics, **configs):
should be set no higher than 1/3 of that value. It can be
adjusted even lower to control the expected time for normal
rebalances. Default: 3000
coordinator_not_ready_retry_timeout_ms (int): The timeout used to
detect that the Kafka coordinator is not available. If 'None',
the default behavior of polling indefinitely would be kept.
Default: None
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
"""
Expand Down Expand Up @@ -240,10 +245,12 @@ def coordinator(self):
else:
return self.coordinator_id

def ensure_coordinator_ready(self):
def ensure_coordinator_ready(self, timeout_ms=None):
"""Block until the coordinator for this group is known
(and we have an active connection -- java client uses unsent queue).
"""
retry_timeout_ms = timeout_ms or self.config['coordinator_not_ready_retry_timeout_ms']
retry_start_time_in_secs = time.time()
with self._lock:
while self.coordinator_unknown():

Expand All @@ -261,7 +268,15 @@ def ensure_coordinator_ready(self):

if future.failed():
if future.retriable():
if getattr(future.exception, 'invalid_metadata', False):
if retry_timeout_ms is not None and isinstance(
future.exception, (Errors.NodeNotReadyError, Errors.NoBrokersAvailable)):
remaining_retry_timeout_ms = retry_timeout_ms - (
time.time() - retry_start_time_in_secs) * 1000
if remaining_retry_timeout_ms <= 0:
raise future.exception # pylint: disable-msg=raising-bad-type
self._client.poll(timeout_ms=min(
self.config['retry_backoff_ms'], remaining_retry_timeout_ms))
elif getattr(future.exception, 'invalid_metadata', False):
log.debug('Requesting metadata for group coordinator request: %s', future.exception)
metadata_update = self._client.cluster.request_update()
self._client.poll(future=metadata_update)
Expand Down
9 changes: 7 additions & 2 deletions kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ class ConsumerCoordinator(BaseCoordinator):
'retry_backoff_ms': 100,
'api_version': (0, 10, 1),
'exclude_internal_topics': True,
'metric_group_prefix': 'consumer'
'metric_group_prefix': 'consumer',
'coordinator_not_ready_retry_timeout_ms': None
}

def __init__(self, client, subscription, metrics, **configs):
Expand Down Expand Up @@ -68,13 +69,17 @@ def __init__(self, client, subscription, metrics, **configs):
adjusted even lower to control the expected time for normal
rebalances. Default: 3000
session_timeout_ms (int): The timeout used to detect failures when
using Kafka's group managementment facilities. Default: 30000
using Kafka's group management facilities. Default: 30000
retry_backoff_ms (int): Milliseconds to backoff when retrying on
errors. Default: 100.
exclude_internal_topics (bool): Whether records from internal topics
(such as offsets) should be exposed to the consumer. If set to
True the only way to receive records from an internal topic is
subscribing to it. Requires 0.10+. Default: True
coordinator_not_ready_retry_timeout_ms (int): The timeout used to
detect that the Kafka coordinator is not available. If 'None',
the default behavior of polling indefinitely would be kept.
Default: None.
"""
super(ConsumerCoordinator, self).__init__(client, metrics, **configs)

Expand Down