Skip to content

Commit

Permalink
Merge branch 'master' into feature/skip_control_batch
Browse files Browse the repository at this point in the history
  • Loading branch information
wbarnha committed Mar 20, 2024
2 parents 7fc565b + 3c124b2 commit 8eb2c52
Show file tree
Hide file tree
Showing 110 changed files with 1,359 additions and 3,410 deletions.
3 changes: 0 additions & 3 deletions .covrc

This file was deleted.

2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ jobs:
needs:
- build-sdist
runs-on: ubuntu-latest
timeout-minutes: 10
timeout-minutes: 15
strategy:
fail-fast: false
matrix:
Expand Down
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# 2.0.3 (under development)

Consumer
* KIP-345: Implement static membership support

# 2.0.2 (Sep 29, 2020)

Consumer
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ test-local: build-integration
cov-local: build-integration
KAFKA_VERSION=$(KAFKA_VERSION) SCALA_VERSION=$(SCALA_VERSION) pytest \
--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka \
--cov-config=.covrc --cov-report html $(FLAGS) kafka test
--cov-report html $(FLAGS) kafka test
@echo "open file://`pwd`/htmlcov/index.html"

# Check the readme for syntax errors, which can lead to invalid formatting on
Expand Down
6 changes: 5 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,12 @@ that expose basic message attributes: topic, partition, offset, key, and value:

.. code-block:: python
# join a consumer group for dynamic partition assignment and offset commits
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group')
# or as a static member with a fixed group member name
# consumer = KafkaConsumer('my_favorite_topic', group_id='my_favorite_group',
# group_instance_id='consumer-1', leave_group_on_close=False)
for msg in consumer:
print (msg)
Expand Down
2 changes: 0 additions & 2 deletions benchmarks/consumer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
import threading
import traceback

from kafka.vendor.six.moves import range

from kafka import KafkaConsumer, KafkaProducer
from test.fixtures import KafkaFixture, ZookeeperFixture

Expand Down
2 changes: 0 additions & 2 deletions benchmarks/producer_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import threading
import traceback

from kafka.vendor.six.moves import range

from kafka import KafkaProducer
from test.fixtures import KafkaFixture, ZookeeperFixture

Expand Down
25 changes: 9 additions & 16 deletions benchmarks/varint_speed.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
#!/usr/bin/env python
from __future__ import print_function
import pyperf
from kafka.vendor import six


test_data = [
Expand Down Expand Up @@ -67,6 +65,10 @@
BENCH_VALUES_DEC = list(map(bytearray, BENCH_VALUES_DEC))


def int2byte(i):
return bytes((i),)


def _assert_valid_enc(enc_func):
for encoded, decoded in test_data:
assert enc_func(decoded) == encoded, decoded
Expand Down Expand Up @@ -116,7 +118,7 @@ def encode_varint_1(num):
_assert_valid_enc(encode_varint_1)


def encode_varint_2(value, int2byte=six.int2byte):
def encode_varint_2(value, int2byte=int2byte):
value = (value << 1) ^ (value >> 63)

bits = value & 0x7f
Expand Down Expand Up @@ -151,7 +153,7 @@ def encode_varint_3(value, buf):
assert res == encoded


def encode_varint_4(value, int2byte=six.int2byte):
def encode_varint_4(value, int2byte=int2byte):
value = (value << 1) ^ (value >> 63)

if value <= 0x7f: # 1 byte
Expand Down Expand Up @@ -301,22 +303,13 @@ def size_of_varint_2(value):
_assert_valid_size(size_of_varint_2)


if six.PY3:
def _read_byte(memview, pos):
""" Read a byte from memoryview as an integer
Raises:
IndexError: if position is out of bounds
"""
return memview[pos]
else:
def _read_byte(memview, pos):
""" Read a byte from memoryview as an integer
def _read_byte(memview, pos):
""" Read a byte from memoryview as an integer
Raises:
IndexError: if position is out of bounds
"""
return ord(memview[pos])
return memview[pos]


def decode_varint_1(buffer, pos=0):
Expand Down
7 changes: 7 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
Changelog
=========

2.2.0
####################

Consumer
--------
* KIP-345: Implement static membership support


2.0.2 (Sep 29, 2020)
####################
Expand Down
12 changes: 12 additions & 0 deletions docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ KafkaConsumer
group_id='my-group',
bootstrap_servers='my.server.com')
# Use multiple static consumers w/ 2.3.0 kafka brokers
consumer1 = KafkaConsumer('my-topic',
group_id='my-group',
group_instance_id='process-1',
leave_group_on_close=False,
bootstrap_servers='my.server.com')
consumer2 = KafkaConsumer('my-topic',
group_id='my-group',
group_instance_id='process-2',
leave_group_on_close=False,
bootstrap_servers='my.server.com')
There are many configuration options for the consumer class. See
:class:`~kafka.KafkaConsumer` API documentation for more details.
Expand Down
2 changes: 0 additions & 2 deletions kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

__title__ = 'kafka'
from kafka.version import __version__
__author__ = 'Dana Powers'
Expand Down
2 changes: 0 additions & 2 deletions kafka/admin/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from __future__ import absolute_import

from kafka.admin.config_resource import ConfigResource, ConfigResourceType
from kafka.admin.client import KafkaAdminClient
from kafka.admin.acl_resource import (ACL, ACLFilter, ResourcePattern, ResourcePatternFilter, ACLOperation,
Expand Down
20 changes: 7 additions & 13 deletions kafka/admin/acl_resource.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
from __future__ import absolute_import
from kafka.errors import IllegalArgumentError
from enum import IntEnum

# enum in stdlib as of py3.4
try:
from enum import IntEnum # pylint: disable=import-error
except ImportError:
# vendored backport module
from kafka.vendor.enum34 import IntEnum
from kafka.errors import IllegalArgumentError


class ResourceType(IntEnum):
Expand Down Expand Up @@ -69,7 +63,7 @@ class ACLResourcePatternType(IntEnum):
PREFIXED = 4


class ACLFilter(object):
class ACLFilter:
"""Represents a filter to use with describing and deleting ACLs
The difference between this class and the ACL class is mainly that
Expand Down Expand Up @@ -161,7 +155,7 @@ def __init__(
permission_type,
resource_pattern
):
super(ACL, self).__init__(principal, host, operation, permission_type, resource_pattern)
super().__init__(principal, host, operation, permission_type, resource_pattern)
self.validate()

def validate(self):
Expand All @@ -173,7 +167,7 @@ def validate(self):
raise IllegalArgumentError("resource_pattern must be a ResourcePattern object")


class ResourcePatternFilter(object):
class ResourcePatternFilter:
def __init__(
self,
resource_type,
Expand Down Expand Up @@ -232,13 +226,13 @@ def __init__(
resource_name,
pattern_type=ACLResourcePatternType.LITERAL
):
super(ResourcePattern, self).__init__(resource_type, resource_name, pattern_type)
super().__init__(resource_type, resource_name, pattern_type)
self.validate()

def validate(self):
if self.resource_type == ResourceType.ANY:
raise IllegalArgumentError("resource_type cannot be ANY")
if self.pattern_type in [ACLResourcePatternType.ANY, ACLResourcePatternType.MATCH]:
raise IllegalArgumentError(
"pattern_type cannot be {} on a concrete ResourcePattern".format(self.pattern_type.name)
f"pattern_type cannot be {self.pattern_type.name} on a concrete ResourcePattern"
)
29 changes: 21 additions & 8 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from __future__ import absolute_import

from collections import defaultdict
import copy
import logging
import socket

from . import ConfigResourceType
from kafka.vendor import six

from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
ACLResourcePatternType
Expand All @@ -20,7 +17,7 @@
from kafka.protocol.admin import (
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest,
DeleteGroupsRequest
DeleteGroupsRequest, DescribeLogDirsRequest
)
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
Expand All @@ -32,7 +29,7 @@
log = logging.getLogger(__name__)


class KafkaAdminClient(object):
class KafkaAdminClient:
"""A class for administering the Kafka cluster.
Warning:
Expand Down Expand Up @@ -194,7 +191,7 @@ def __init__(self, **configs):
log.debug("Starting KafkaAdminClient with configuration: %s", configs)
extra_configs = set(configs).difference(self.DEFAULT_CONFIG)
if extra_configs:
raise KafkaConfigurationError("Unrecognized configs: {}".format(extra_configs))
raise KafkaConfigurationError(f"Unrecognized configs: {extra_configs}")

self.config = copy.copy(self.DEFAULT_CONFIG)
self.config.update(configs)
Expand Down Expand Up @@ -874,7 +871,7 @@ def describe_configs(self, config_resources, include_synonyms=False):
))
else:
raise NotImplementedError(
"Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient.".format(version))
f"Support for DescribeConfigs v{version} has not yet been added to KafkaAdminClient.")

self._wait_for_futures(futures)
return [f.value for f in futures]
Expand Down Expand Up @@ -1197,7 +1194,7 @@ def _list_consumer_group_offsets_send_request(self, group_id,
topics_partitions_dict = defaultdict(set)
for topic, partition in partitions:
topics_partitions_dict[topic].add(partition)
topics_partitions = list(six.iteritems(topics_partitions_dict))
topics_partitions = list(topics_partitions_dict.items())
request = OffsetFetchRequest[version](group_id, topics_partitions)
else:
raise NotImplementedError(
Expand Down Expand Up @@ -1345,3 +1342,19 @@ def _wait_for_futures(self, futures):

if future.failed():
raise future.exception # pylint: disable-msg=raising-bad-type

def describe_log_dirs(self):
"""Send a DescribeLogDirsRequest request to a broker.
:return: A message future
"""
version = self._matching_api_version(DescribeLogDirsRequest)
if version <= 1:
request = DescribeLogDirsRequest[version]()
future = self._send_request_to_node(self._client.least_loaded_node(), request)
self._wait_for_futures([future])
else:
raise NotImplementedError(
"Support for DescribeLogDirsRequest_v{} has not yet been added to KafkaAdminClient."
.format(version))
return future.value
11 changes: 2 additions & 9 deletions kafka/admin/config_resource.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,4 @@
from __future__ import absolute_import

# enum in stdlib as of py3.4
try:
from enum import IntEnum # pylint: disable=import-error
except ImportError:
# vendored backport module
from kafka.vendor.enum34 import IntEnum
from enum import IntEnum


class ConfigResourceType(IntEnum):
Expand All @@ -15,7 +8,7 @@ class ConfigResourceType(IntEnum):
TOPIC = 2


class ConfigResource(object):
class ConfigResource:
"""A class for specifying config resources.
Arguments:
resource_type (ConfigResourceType): the type of kafka resource
Expand Down
5 changes: 1 addition & 4 deletions kafka/admin/new_partitions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
from __future__ import absolute_import


class NewPartitions(object):
class NewPartitions:
"""A class for new partition creation on existing topics. Note that the length of new_assignments, if specified,
must be the difference between the new total number of partitions and the existing number of partitions.
Arguments:
Expand Down
4 changes: 1 addition & 3 deletions kafka/admin/new_topic.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
from __future__ import absolute_import

from kafka.errors import IllegalArgumentError


class NewTopic(object):
class NewTopic:
""" A class for new topic creation
Arguments:
name (string): name of the topic
Expand Down
Loading

0 comments on commit 8eb2c52

Please sign in to comment.