Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase consumer group test timeout #187

Draft
wants to merge 12 commits into
base: master
Choose a base branch
from
13 changes: 1 addition & 12 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,8 @@ jobs:
fail-fast: false
matrix:
python-version:
- "3.8"
- "3.9"
- "3.10"
- "3.11"
- "3.12"
- "pypy3.9"
experimental: [ false ]
steps:
- name: Checkout the source code
Expand Down Expand Up @@ -116,19 +112,12 @@ jobs:
fail-fast: false
matrix:
kafka-version:
- "0.9.0.1"
- "0.10.2.2"
- "0.11.0.2"
- "0.11.0.3"
- "1.1.1"
- "2.4.0"
- "2.5.0"
- "2.6.0"
python-version: ['3.12']
experimental: [false]
include:
- kafka-version: '0.8.2.2'
experimental: true
experimental: false
python-version: "3.12"
- kafka-version: '0.8.2.2'
experimental: false
Expand Down
11 changes: 11 additions & 0 deletions envvar
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

if [ "$SHLVL" != "1" ] || [ "$PS1" == "" ]; then
echo "Error: Please source, not run. Example '. $(basename $0)'"
exit 1
fi

export KAFKA_VERSION=2.6.0
export PATH=${HOME}/.venvs/bin:${PATH}
. travis_java_install.sh

19 changes: 19 additions & 0 deletions first-setup-ubuntu
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/bash

export KAFKA_VERSION=2.6.0
sudo apt-get update
sudo apt-get -y install python3 python3-pip python3-venv openjdk-11-jre libsnappy-dev libzstd-dev python3-full

mkdir -p ${HOME}/.venvs
python3 -m venv ${HOME}/.venvs
export PATH=${HOME}/.venvs/bin:${PATH}

pip3 install build
python -m build
. travis_java_install.sh
./build_integration.sh
python -m pip install tox

pip install .
pip install -r requirements-dev.txt

13 changes: 13 additions & 0 deletions java-temurin
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

wget https://github.com/adoptium/temurin11-binaries/releases/download/jdk-11.0.12%2B7/OpenJDK11U-jdk_x64_linux_hotspot_11.0.12_7.tar.gz
tar xf OpenJDK11U-jdk_x64_linux_hotspot_11.0.12_7.tar.gz

# wget https://github.com/adoptium/temurin8-binaries/releases/download/jdk8u422-b05/OpenJDK8U-jre_x64_linux_hotspot_8u422b05.tar.gz
# tar xf OpenJDK8U-jre_x64_linux_hotspot_8u422b05.tar.gz

pushd .
cd jdk-11.0.12+7
# cd jdk8u422-b05-jre/
export PATH="${PWD}/bin:${PATH}"
popd
1 change: 1 addition & 0 deletions kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,7 @@ def least_loaded_node(self):
node_id or None if no suitable node was found
"""
nodes = [broker.nodeId for broker in self.cluster.brokers()]
log.debug("least_loaded_node %r", nodes)
random.shuffle(nodes)

inflight = float('inf')
Expand Down
13 changes: 13 additions & 0 deletions kill-extra-java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/bash

while [ true ]; do
ps auxf | grep -v grep | grep kafka.pro | awk '{print $2}' > kafka-pids
kafka_count=$(cat kafka-pids | wc -l)
if [ $kafka_count -ge 2 ]; then
kafka_kill_pid=$(head -n 1 kafka-pids)
date "+%F %T " | tr -d "\n"
echo "Detected $kafka_count Kafka, killing $kafka_kill_pid"
kill $kafka_kill_pid
fi
sleep 1
done
8 changes: 8 additions & 0 deletions repeat-tox
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash
set -e -x

while [ true ]; do
tox
sleep 10
done

12 changes: 12 additions & 0 deletions setup-after-git-clean
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#!/bin/bash

export KAFKA_VERSION=2.6.0
export PATH=${HOME}/.venvs/bin:${PATH}

python -m build
. travis_java_install.sh
./build_integration.sh

pip install .
pip install -r requirements-dev.txt

53 changes: 36 additions & 17 deletions test/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,16 @@ def render_template(cls, source_file, target_file, binding):
dirfd = os.open(os.path.dirname(target_file.strpath), os.O_DIRECTORY)
os.fsync(dirfd)
os.close(dirfd)
log.debug("Template string:")
for line in template.splitlines():
log.debug(' ' + line.strip())
log.debug("Rendered template:")
with open(target_file.strpath, 'r') as o:
for line in o:
log.debug(' ' + line.strip())
log.debug("binding:")
for key, value in binding.items():
log.debug(" {key}={value}".format(key=key, value=value))
#log.debug("Template string:")
#for line in template.splitlines():
# log.debug(' ' + line.strip())
#log.debug("Rendered template:")
#with open(target_file.strpath, 'r') as o:
# for line in o:
# log.debug(' ' + line.strip())
#log.debug("binding:")
#for key, value in binding.items():
# log.debug(" {key}={value}".format(key=key, value=value))

def dump_logs(self):
self.child.dump_logs()
Expand Down Expand Up @@ -203,12 +203,13 @@ def open(self):
env = self.kafka_run_class_env()

# Party!
timeout = 5
max_timeout = 120
timeout = 5 * float(os.environ.get('RETRY_TIMEOUT_MULTIPLIER', 1))
max_timeout = 120 * float(os.environ.get('MAX_TIMEOUT_MULTIPLIER', 1))
backoff = 1
end_at = time.time() + max_timeout
tries = 1
auto_port = (self.port is None)
orange_start = time.monotonic()
while time.time() < end_at:
if auto_port:
self.port = get_open_port()
Expand All @@ -217,8 +218,11 @@ def open(self):
self.child = SpawnedService(args, env)
self.child.start()
timeout = min(timeout, max(end_at - time.time(), 0))
if self.child.wait_for(r"binding to port", timeout=timeout):
break
try:
if self.child.wait_for(r"binding to port", timeout=timeout):
break
except RuntimeError:
self.child.join()
self.child.dump_logs()
self.child.stop()
timeout *= 2
Expand All @@ -227,6 +231,8 @@ def open(self):
backoff += 1
else:
raise RuntimeError('Failed to start Zookeeper before max_timeout')
with open("/tmp/orange", "a") as orange_f:
orange_f.write("open " + str(time.monotonic() - orange_start) + "\n")
self.out("Done!")
atexit.register(self.close)

Expand Down Expand Up @@ -358,6 +364,7 @@ def _add_scram_user(self):
)
env = self.kafka_run_class_env()
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
log.info("PID %r args %r", proc.pid, args)

stdout, stderr = proc.communicate()

Expand Down Expand Up @@ -394,6 +401,7 @@ def _create_zk_chroot(self):
"kafka-python")
env = self.kafka_run_class_env()
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
log.info("PID %r args %r", proc.pid, args)

stdout, stderr = proc.communicate()

Expand Down Expand Up @@ -421,12 +429,13 @@ def start(self):
env['KAFKA_OPTS'] = opts
self.render_template(jaas_conf_template, jaas_conf, vars(self))

timeout = 5
max_timeout = 120
timeout = 5 * float(os.environ.get('RETRY_TIMEOUT_MULTIPLIER', 1))
max_timeout = 120 * float(os.environ.get('MAX_TIMEOUT_MULTIPLIER', 1))
backoff = 1
end_at = time.time() + max_timeout
tries = 1
auto_port = (self.port is None)
orange_start = time.monotonic()
while time.time() < end_at:
# We have had problems with port conflicts on travis
# so we will try a different port on each retry
Expand All @@ -451,14 +460,21 @@ def start(self):
backoff += 1
else:
raise RuntimeError('Failed to start KafkaInstance before max_timeout')
with open("/tmp/orange", "a") as orange_f:
orange_f.write("start " + str(time.monotonic() - orange_start) + "\n")

(self._client,) = self.get_clients(1, client_id='_internal_client')

self.out("Done!")
self.running = True

def _broker_ready(self, timeout):
return self.child.wait_for(self.start_pattern, timeout=timeout)
#return self.child.wait_for(self.start_pattern, timeout=timeout)
try:
orange_w = self.child.wait_for(self.start_pattern, timeout=timeout)
return orange_w
except RuntimeError:
self.child.join()

def _scram_user_present(self, timeout):
# no need to wait for scram user if scram is not used
Expand Down Expand Up @@ -602,6 +618,7 @@ def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor):
args.append('--if-not-exists')
env = self.kafka_run_class_env()
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
log.info("PID %r args %r", proc.pid, args)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
if 'kafka.common.TopicExistsException' not in stdout:
Expand All @@ -620,6 +637,7 @@ def get_topic_names(self):
env = self.kafka_run_class_env()
env.pop('KAFKA_LOG4J_OPTS')
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
log.info("PID %r args %r", proc.pid, args)
stdout, stderr = proc.communicate()
if proc.returncode != 0:
self.out("Failed to list topics!")
Expand All @@ -637,6 +655,7 @@ def _enrich_client_params(self, params, **defaults):
for key, value in defaults.items():
params.setdefault(key, value)
params.setdefault('bootstrap_servers', self.bootstrap_server())
params.setdefault('api_version_auto_timeout_ms', KafkaClient.DEFAULT_CONFIG.get('api_version_auto_timeout_ms', 2000) * float(os.environ.get('MAX_TIMEOUT_MULTIPLIER', 1)))
if self.sasl_enabled:
params.setdefault('sasl_mechanism', self.sasl_mechanism)
params.setdefault('security_protocol', self.transport)
Expand Down
33 changes: 23 additions & 10 deletions test/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,28 @@ def __init__(self, args=None, env=None):
self.daemon = True
log.info("Created service for command:")
log.info(" "+' '.join(self.args))
log.debug("With environment:")
for key, value in self.env.items():
log.debug(" {key}={value}".format(key=key, value=value))
#log.debug("With environment:")
#for key, value in self.env.items():
# log.debug(" {key}={value}".format(key=key, value=value))

def _spawn(self):
if self.alive: return
if self.child and self.child.poll() is None: return

self.child = subprocess.Popen(
self.args,
preexec_fn=os.setsid, # to avoid propagating signals
env=self.env,
bufsize=1,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
log.info("spawning")
try:
self.child = subprocess.Popen(
self.args,
preexec_fn=os.setsid, # to avoid propagating signals
env=self.env,
bufsize=1,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
except Exception as e:
log.warning("swapning error %r", e)
log.info("PID %r args %r", self.child.pid, self.args)
self.alive = self.child.poll() is None
log.info("alive is %r", self.alive)

def _despawn(self):
if self.child.poll() is None:
Expand All @@ -82,6 +88,7 @@ def run(self):
try:
(rds, _, _) = select.select([self.child.stdout, self.child.stderr], [], [], 1)
except select.error as ex:
log.warning("select error: %r", ex)
if ex.args[0] == 4:
continue
else:
Expand All @@ -99,20 +106,26 @@ def run(self):

if self.child.poll() is not None:
self.dump_logs()
log.warning("dump log and break")
break

if self.should_die.is_set():
self._despawn()
log.warning("despawn and break")
break

def dump_logs(self):
sys.stderr.write('\n'.join(self.captured_stderr))
sys.stdout.write('\n'.join(self.captured_stdout))
with open("/tmp/orange", "a") as orange_f:
orange_f.write('\n'.join(self.captured_stderr))
orange_f.write('\n'.join(self.captured_stdout))

def wait_for(self, pattern, timeout=30):
start = time.time()
while True:
if not self.is_alive():
self.join()
raise RuntimeError("Child thread died already.")

elapsed = time.time() - start
Expand Down
23 changes: 23 additions & 0 deletions test/test_admin_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,37 @@ def test_describe_configs_mixed_resources_returns_configs(topic, kafka_admin_cli
ConfigResource(ConfigResourceType.BROKER, broker_id)])

assert len(configs) == 2
retry = False

for config in configs:
assert (config.resources[0][2] == ConfigResourceType.TOPIC
and config.resources[0][3] == topic) or \
(config.resources[0][2] == ConfigResourceType.BROKER
and config.resources[0][3] == str(broker_id))
#assert len(config.resources[0][4]) > 1
if not (len(config.resources[0][4]) > 1):
retry = True

if not retry:
return

sleep(10)

broker_id = kafka_admin_client._client.cluster._brokers[0].nodeId
configs = kafka_admin_client.describe_configs([
ConfigResource(ConfigResourceType.TOPIC, topic),
ConfigResource(ConfigResourceType.BROKER, broker_id)])

assert len(configs) == 2
for config in configs:
assert (config.resources[0][2] == ConfigResourceType.TOPIC
and config.resources[0][3] == topic) or \
(config.resources[0][2] == ConfigResourceType.BROKER
and config.resources[0][3] == str(broker_id))
assert len(config.resources[0][4]) > 1

assert False


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11")
def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):
Expand Down
2 changes: 1 addition & 1 deletion test/test_consumer_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def test_consumer(kafka_broker, topic):
def test_consumer_topics(kafka_broker, topic):
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
# Necessary to drive the IO
consumer.poll(500)
consumer.poll(5000)
assert topic in consumer.topics()
assert len(consumer.partitions_for_topic(topic)) > 0
consumer.close()
Expand Down
Loading
Loading