Skip to content
28 changes: 17 additions & 11 deletions tests/common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,28 @@

@staticmethod
def update_conf_group_protocol(conf=None):
if conf is not None and 'group.id' in conf and TestUtils.use_group_protocol_consumer():
if TestUtils.can_upgrade_group_protocol_to_consumer(conf):
conf['group.protocol'] = 'consumer'

Check failure on line 59 in tests/common/__init__.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/common/__init__.py#L59

Define a constant instead of duplicating this literal 'group.protocol' 3 times.

@staticmethod
def can_upgrade_group_protocol_to_consumer(conf):
return (conf is not None and 'group.id' in conf and
'group.protocol' not in conf and TestUtils.use_group_protocol_consumer())

@staticmethod
def remove_forbidden_conf_group_protocol_consumer(conf):
if conf is None:
if (conf is None or
not TestUtils.use_group_protocol_consumer() or
conf.get('group.protocol', 'consumer') != 'consumer'):
return
if TestUtils.use_group_protocol_consumer():
forbidden_conf_properties = ["session.timeout.ms",
"partition.assignment.strategy",
"heartbeat.interval.ms",
"group.protocol.type"]
for prop in forbidden_conf_properties:
if prop in conf:
print("Skipping setting forbidden configuration {prop} for `CONSUMER` protocol")
del conf[prop]
forbidden_conf_properties = ["session.timeout.ms",
"partition.assignment.strategy",
"heartbeat.interval.ms",
"group.protocol.type"]
for prop in forbidden_conf_properties:
if prop in conf:
print(f"Skipping setting forbidden configuration {prop} for `CONSUMER` protocol")
del conf[prop]


class TestConsumer(Consumer):
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/admin/test_basic_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def test_basic_operations(kafka_cluster):
# Second iteration: create topic.
#
for validate in (True, False):
our_topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix,
our_topic = kafka_cluster.create_topic_and_wait_propagation(topic_prefix,
{
"num_partitions": num_partitions,
"config": topic_config,
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/admin/test_delete_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def test_delete_records(kafka_cluster):
admin_client = kafka_cluster.admin()

# Create a topic with a single partition
topic = kafka_cluster.create_topic_and_wait_propogation("test-del-records",
topic = kafka_cluster.create_topic_and_wait_propagation("test-del-records",
{
"num_partitions": 1,
"replication_factor": 1,
Expand Down Expand Up @@ -73,12 +73,12 @@ def test_delete_records_multiple_topics_and_partitions(kafka_cluster):
admin_client = kafka_cluster.admin()
num_partitions = 3
# Create two topics with a single partition
topic = kafka_cluster.create_topic_and_wait_propogation("test-del-records",
topic = kafka_cluster.create_topic_and_wait_propagation("test-del-records",
{
"num_partitions": num_partitions,
"replication_factor": 1,
})
topic2 = kafka_cluster.create_topic_and_wait_propogation("test-del-records2",
topic2 = kafka_cluster.create_topic_and_wait_propagation("test-del-records2",
{
"num_partitions": num_partitions,
"replication_factor": 1,
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/admin/test_describe_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def test_describe_operations(sasl_cluster):

# Create Topic
topic_config = {"compression.type": "gzip"}
our_topic = sasl_cluster.create_topic_and_wait_propogation(topic_prefix,
our_topic = sasl_cluster.create_topic_and_wait_propagation(topic_prefix,
{
"num_partitions": 1,
"config": topic_config,
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/admin/test_incremental_alter_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ def test_incremental_alter_configs(kafka_cluster):
num_partitions = 2
topic_config = {"compression.type": "gzip"}

our_topic = kafka_cluster.create_topic_and_wait_propogation(topic_prefix,
our_topic = kafka_cluster.create_topic_and_wait_propagation(topic_prefix,
{
"num_partitions": num_partitions,
"config": topic_config,
"replication_factor": 1,
})
our_topic2 = kafka_cluster.create_topic_and_wait_propogation(topic_prefix2,
our_topic2 = kafka_cluster.create_topic_and_wait_propagation(topic_prefix2,
{
"num_partitions": num_partitions,
"config": topic_config,
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/admin/test_list_offsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def test_list_offsets(kafka_cluster):
admin_client = kafka_cluster.admin()

# Create a topic with a single partition
topic = kafka_cluster.create_topic_and_wait_propogation("test-topic-verify-list-offsets",
topic = kafka_cluster.create_topic_and_wait_propagation("test-topic-verify-list-offsets",
{
"num_partitions": 1,
"replication_factor": 1,
Expand Down
19 changes: 16 additions & 3 deletions tests/integration/cluster_fixture.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,22 @@ def create_topic(self, prefix, conf=None, **create_topic_kwargs):
future_topic.get(name).result()
return name

def create_topic_and_wait_propogation(self, prefix, conf=None, **create_topic_kwargs):
def delete_topic(self, topic):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same - it this intended to be added

"""
Creates a new topic with this cluster. Wait for the topic to be propogated to all brokers.
Deletes a topic with this cluster.

:param str topic: topic name
"""
future = self.admin().delete_topics([topic])
try:
future.get(topic).result()
print("Topic {} deleted".format(topic))
except Exception as e:
print("Failed to delete topic {}: {}".format(topic, e))

def create_topic_and_wait_propagation(self, prefix, conf=None, **create_topic_kwargs):
"""
Creates a new topic with this cluster. Wait for the topic to be propagated to all brokers.

:param str prefix: topic name
:param dict conf: additions/overrides to topic configuration.
Expand All @@ -245,7 +258,7 @@ def create_topic_and_wait_propogation(self, prefix, conf=None, **create_topic_kw
"""
name = self.create_topic(prefix, conf, **create_topic_kwargs)

# wait for topic propogation across all the brokers.
# wait for topic propagation across all the brokers.
# FIXME: find a better way to wait for topic creation
# for all languages, given option to send request to
# a specific broker isn't present everywhere.
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/consumer/test_consumer_error.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limit
# limitations under the License.
#

import pytest
Expand All @@ -29,7 +29,7 @@ def test_consume_error(kafka_cluster):
Tests to ensure librdkafka errors are propagated as
an instance of ConsumeError.
"""
topic = kafka_cluster.create_topic_and_wait_propogation("test_commit_transaction")
topic = kafka_cluster.create_topic_and_wait_propagation("test_commit_transaction")
consumer_conf = {'group.id': 'pytest', 'enable.partition.eof': True}

producer = kafka_cluster.producer()
Expand Down Expand Up @@ -59,7 +59,7 @@ def test_consume_error_commit(kafka_cluster):
"""
Tests to ensure that we handle messages with errors when commiting.
"""
topic = kafka_cluster.create_topic_and_wait_propogation("test_commit_transaction")
topic = kafka_cluster.create_topic_and_wait_propagation("test_commit_transaction")
consumer_conf = {'group.id': 'pytest',
'session.timeout.ms': 100}

Expand Down Expand Up @@ -91,7 +91,7 @@ def test_consume_error_store_offsets(kafka_cluster):
"""
Tests to ensure that we handle messages with errors when storing offsets.
"""
topic = kafka_cluster.create_topic_and_wait_propogation("test_commit_transaction")
topic = kafka_cluster.create_topic_and_wait_propagation("test_commit_transaction")
consumer_conf = {'group.id': 'pytest',
'session.timeout.ms': 100,
'enable.auto.offset.store': True,
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/consumer/test_consumer_memberid.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limit
# limitations under the License.

import pytest
from tests.common import TestUtils
Expand All @@ -28,7 +28,7 @@ def test_consumer_memberid(kafka_cluster):

topic = "testmemberid"

kafka_cluster.create_topic_and_wait_propogation(topic)
kafka_cluster.create_topic_and_wait_propagation(topic)

consumer = kafka_cluster.consumer(consumer_conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limit
# limitations under the License.

from confluent_kafka import TopicPartition

Expand All @@ -30,7 +30,7 @@ def commit_and_check(consumer, topic, metadata):


def test_consumer_topicpartition_metadata(kafka_cluster):
topic = kafka_cluster.create_topic_and_wait_propogation("test_topicpartition")
topic = kafka_cluster.create_topic_and_wait_propagation("test_topicpartition")
consumer_conf = {'group.id': 'test_topicpartition'}

c = kafka_cluster.consumer(consumer_conf)
Expand Down
128 changes: 128 additions & 0 deletions tests/integration/consumer/test_consumer_upgrade_downgrade.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright 2025 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
from enum import Enum
from confluent_kafka import ConsumerGroupType, KafkaException
from tests.common import TestUtils

topic_prefix = "test_consumer_upgrade_downgrade_"
number_of_partitions = 10


def get_group_protocol_type(a, group_id):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are those changes (adding test) also intended for this PR (fixing typo)?

futureMap = a.describe_consumer_groups([group_id])

Check warning on line 28 in tests/integration/consumer/test_consumer_upgrade_downgrade.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/integration/consumer/test_consumer_upgrade_downgrade.py#L28

Rename this local variable "futureMap" to match the regular expression ^[_a-z][a-z0-9_]*$.
try:
future = futureMap[group_id]
g = future.result()
return g.type
except KafkaException as e:
print("Error while describing group id '{}': {}".format(group_id, e))
except Exception:
raise

Check warning on line 36 in tests/integration/consumer/test_consumer_upgrade_downgrade.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/integration/consumer/test_consumer_upgrade_downgrade.py#L36

Add logic to this except clause or eliminate it and rethrow the exception automatically.


def check_consumer(kafka_cluster, consumers, admin_client, group_id, topic, expected_protocol):
no_of_messages = 100
total_msg_read = 0
expected_partitions_per_consumer = number_of_partitions // len(consumers)
while len(consumers[-1].assignment()) != expected_partitions_per_consumer:
for consumer in consumers:
consumer.poll(0.1)

all_assignments = set()
for consumer in consumers:
assignment = consumer.assignment()
all_assignments.update(assignment)
assert len(assignment) == expected_partitions_per_consumer
assert len(all_assignments) == number_of_partitions

assert get_group_protocol_type(admin_client, group_id) == expected_protocol

# Produce some messages to the topic
test_data = ['test-data{}'.format(i) for i in range(0, no_of_messages)]
test_keys = ['test-key{}'.format(i) for i in range(0, no_of_messages)] # we want each partition to have data
kafka_cluster.seed_topic(topic, test_data, test_keys)

while total_msg_read < no_of_messages:
for consumer in consumers:
# Poll for messages
msg = consumer.poll(0.1)
if msg is not None:
total_msg_read += 1

assert total_msg_read == no_of_messages, f"Expected to read {no_of_messages} messages, but read {total_msg_read}"


class Operation(Enum):
ADD = 0
REMOVE = 1


def perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(
kafka_cluster, partition_assignment_strategy):
"""
Test consumer upgrade and downgrade.
"""
topic_name_prefix = f"{topic_prefix}_{partition_assignment_strategy}"
topic = kafka_cluster.create_topic_and_wait_propagation(topic_name_prefix,
{
"num_partitions": number_of_partitions
})
admin_client = kafka_cluster.admin()

consumer_conf = {'group.id': topic,
'auto.offset.reset': 'earliest'}
consumer_conf_classic = {
'group.protocol': 'classic',
'partition.assignment.strategy': partition_assignment_strategy,
**consumer_conf
}
consumer_conf_consumer = {
'group.protocol': 'consumer',
**consumer_conf
}

test_scenarios = [(Operation.ADD, consumer_conf_classic, ConsumerGroupType.CLASSIC),
(Operation.ADD, consumer_conf_consumer, ConsumerGroupType.CONSUMER),
(Operation.REMOVE, None, ConsumerGroupType.CONSUMER),
(Operation.ADD, consumer_conf_classic, ConsumerGroupType.CONSUMER),
(Operation.REMOVE, None, ConsumerGroupType.CLASSIC)]
consumers = []

for operation, conf, expected_protocol in test_scenarios:
if operation == Operation.ADD:
consumer = kafka_cluster.consumer(conf)
assert consumer is not None
consumer.subscribe([topic])
consumers.append(consumer)
elif operation == Operation.REMOVE:
consumer_to_remove = consumers.pop(0)
consumer_to_remove.close()
check_consumer(kafka_cluster, consumers, admin_client, topic, topic, expected_protocol)

assert len(consumers) == 1
consumers[0].close()
kafka_cluster.delete_topic(topic)


@pytest.mark.skipif(not TestUtils.use_group_protocol_consumer(),
reason="Skipping test as group protocol consumer is not enabled")
def test_consumer_upgrade_downgrade(kafka_cluster):
perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(kafka_cluster, 'roundrobin')
perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(kafka_cluster, 'range')
perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(kafka_cluster, 'cooperative-sticky')
6 changes: 3 additions & 3 deletions tests/integration/consumer/test_cooperative_rebalance_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limit
# limitations under the License.

import pytest
import time
Expand Down Expand Up @@ -60,8 +60,8 @@ def on_lost(self, consumer, partitions):

reb = RebalanceState()

topic1 = kafka_cluster.create_topic_and_wait_propogation("topic1")
topic2 = kafka_cluster.create_topic_and_wait_propogation("topic2")
topic1 = kafka_cluster.create_topic_and_wait_propagation("topic1")
topic2 = kafka_cluster.create_topic_and_wait_propagation("topic2")

consumer = kafka_cluster.consumer(consumer_conf)

Expand Down
4 changes: 2 additions & 2 deletions tests/integration/consumer/test_cooperative_rebalance_2.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limit
# limitations under the License.

import pytest
import time
Expand Down Expand Up @@ -52,7 +52,7 @@ def on_revoke(self, consumer, partitions):

reb = RebalanceState()

topic1 = kafka_cluster.create_topic_and_wait_propogation("topic1")
topic1 = kafka_cluster.create_topic_and_wait_propagation("topic1")

consumer = kafka_cluster.consumer(consumer_conf)

Expand Down
6 changes: 3 additions & 3 deletions tests/integration/consumer/test_incremental_assign.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limit
# limitations under the License.

import pytest
from uuid import uuid1
Expand All @@ -30,8 +30,8 @@ def test_incremental_assign(kafka_cluster):
'enable.auto.commit': 'false',
'auto.offset.reset': 'error'}

topic1 = kafka_cluster.create_topic_and_wait_propogation("topic1")
topic2 = kafka_cluster.create_topic_and_wait_propogation("topic2")
topic1 = kafka_cluster.create_topic_and_wait_propagation("topic1")
topic2 = kafka_cluster.create_topic_and_wait_propagation("topic2")

kafka_cluster.seed_topic(topic1, value_source=[b'a'])
kafka_cluster.seed_topic(topic2, value_source=[b'b'])
Expand Down
Loading