From 8f440f58a719a2afa35c70c834542f926d7e9efa Mon Sep 17 00:00:00 2001 From: Orange Kao Date: Fri, 9 Aug 2024 04:40:50 +0000 Subject: [PATCH 01/12] [PLEASE IGNORE] Trying to trigger test to find out cause of failure --- test/fixtures.py | 14 ++++++++++---- tox.ini | 26 +++++--------------------- 2 files changed, 15 insertions(+), 25 deletions(-) diff --git a/test/fixtures.py b/test/fixtures.py index 998dc429f..e7dd8ae22 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -203,12 +203,13 @@ def open(self): env = self.kafka_run_class_env() # Party! - timeout = 5 - max_timeout = 120 + timeout = 5 * float(os.environ.get('RETRY_TIMEOUT_MULTIPLIER', 1)) + max_timeout = 120 * float(os.environ.get('MAX_TIMEOUT_MULTIPLIER', 1)) backoff = 1 end_at = time.time() + max_timeout tries = 1 auto_port = (self.port is None) + orange_start = time.monotonic() while time.time() < end_at: if auto_port: self.port = get_open_port() @@ -227,6 +228,8 @@ def open(self): backoff += 1 else: raise RuntimeError('Failed to start Zookeeper before max_timeout') + with open("/tmp/orange", "w") as orange_f: + orange_f.write("open " + str(time.monotonic() - orange_start) + "\n") self.out("Done!") atexit.register(self.close) @@ -421,12 +424,13 @@ def start(self): env['KAFKA_OPTS'] = opts self.render_template(jaas_conf_template, jaas_conf, vars(self)) - timeout = 5 - max_timeout = 120 + timeout = 5 * float(os.environ.get('RETRY_TIMEOUT_MULTIPLIER', 1)) + max_timeout = 120 * float(os.environ.get('MAX_TIMEOUT_MULTIPLIER', 1)) backoff = 1 end_at = time.time() + max_timeout tries = 1 auto_port = (self.port is None) + orange_start = time.monotonic() while time.time() < end_at: # We have had problems with port conflicts on travis # so we will try a different port on each retry @@ -451,6 +455,8 @@ def start(self): backoff += 1 else: raise RuntimeError('Failed to start KafkaInstance before max_timeout') + with open("/tmp/orange", "w") as orange_f: + orange_f.write("start " + str(time.monotonic() - orange_start) + "\n") (self._client,) = self.get_clients(1, client_id='_internal_client') diff --git a/tox.ini b/tox.ini index a574dc136..9d379ca1c 100644 --- a/tox.ini +++ b/tox.ini @@ -1,19 +1,14 @@ [tox] -envlist = py{38,39,310,311,312,py}, docs +envlist = py312 [pytest] -testpaths = kafka test +testpaths = test/test_ssl_integration.py addopts = --durations=10 log_format = %(created)f %(filename)-23s %(threadName)s %(message)s [gh-actions] python = - 3.8: py38 - 3.9: py39 - 3.10: py310 - 3.11: py311 3.12: py312 - pypy-3.9: pypy [testenv] deps = @@ -30,22 +25,11 @@ deps = crc32c botocore commands = - pytest {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka} + pytest {posargs:--cov=kafka -vvv --tb=long --capture=sys --show-capture=all --log-level=notset --log-cli-level=notset --log-file-level=notset} setenv = CRC32C_SW_MODE = auto PROJECT_ROOT = {toxinidir} + RETRY_TIMEOUT_MULTIPLIER = 8 + MAX_TIMEOUT_MULTIPLIER = 1 passenv = KAFKA_VERSION - -[testenv:pypy] -# pylint is super slow on pypy... -commands = pytest {posargs:--cov=kafka} - -[testenv:docs] -deps = - sphinx_rtd_theme - sphinx - -commands = - sphinx-apidoc -o docs/apidoc/ kafka/ - sphinx-build -b html docs/ docs/_build From e38b9ab5981c608761363837f87fac849d0dddd0 Mon Sep 17 00:00:00 2001 From: Orange Kao Date: Mon, 12 Aug 2024 05:26:39 +0000 Subject: [PATCH 02/12] debug: trying to find out why subprocess dies --- kafka/client_async.py | 1 + test/fixtures.py | 36 +++++++++++++++++++++++------------- test/service.py | 10 +++++++--- test/test_ssl_integration.py | 4 ++-- tox.ini | 2 +- 5 files changed, 34 insertions(+), 19 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 984cd81fb..2dc6a1f56 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -754,6 +754,7 @@ def least_loaded_node(self): node_id or None if no suitable node was found """ nodes = [broker.nodeId for broker in self.cluster.brokers()] + log.debug("least_loaded_node %r", nodes) random.shuffle(nodes) inflight = float('inf') diff --git a/test/fixtures.py b/test/fixtures.py index e7dd8ae22..8aff66c1c 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -141,16 +141,16 @@ def render_template(cls, source_file, target_file, binding): dirfd = os.open(os.path.dirname(target_file.strpath), os.O_DIRECTORY) os.fsync(dirfd) os.close(dirfd) - log.debug("Template string:") - for line in template.splitlines(): - log.debug(' ' + line.strip()) - log.debug("Rendered template:") - with open(target_file.strpath, 'r') as o: - for line in o: - log.debug(' ' + line.strip()) - log.debug("binding:") - for key, value in binding.items(): - log.debug(" {key}={value}".format(key=key, value=value)) + #log.debug("Template string:") + #for line in template.splitlines(): + # log.debug(' ' + line.strip()) + #log.debug("Rendered template:") + #with open(target_file.strpath, 'r') as o: + # for line in o: + # log.debug(' ' + line.strip()) + #log.debug("binding:") + #for key, value in binding.items(): + # log.debug(" {key}={value}".format(key=key, value=value)) def dump_logs(self): self.child.dump_logs() @@ -228,7 +228,7 @@ def open(self): backoff += 1 else: raise RuntimeError('Failed to start Zookeeper before max_timeout') - with open("/tmp/orange", "w") as orange_f: + with open("/tmp/orange", "a") as orange_f: orange_f.write("open " + str(time.monotonic() - orange_start) + "\n") self.out("Done!") atexit.register(self.close) @@ -361,6 +361,7 @@ def _add_scram_user(self): ) env = self.kafka_run_class_env() proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + log.info("PID %r args %r", proc.pid, args) stdout, stderr = proc.communicate() @@ -397,6 +398,7 @@ def _create_zk_chroot(self): "kafka-python") env = self.kafka_run_class_env() proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + log.info("PID %r args %r", proc.pid, args) stdout, stderr = proc.communicate() @@ -455,7 +457,7 @@ def start(self): backoff += 1 else: raise RuntimeError('Failed to start KafkaInstance before max_timeout') - with open("/tmp/orange", "w") as orange_f: + with open("/tmp/orange", "a") as orange_f: orange_f.write("start " + str(time.monotonic() - orange_start) + "\n") (self._client,) = self.get_clients(1, client_id='_internal_client') @@ -464,7 +466,12 @@ def start(self): self.running = True def _broker_ready(self, timeout): - return self.child.wait_for(self.start_pattern, timeout=timeout) + #return self.child.wait_for(self.start_pattern, timeout=timeout) + try: + orange_w = self.child.wait_for(self.start_pattern, timeout=timeout) + return orange_w + except RuntimeError: + self.child.join() def _scram_user_present(self, timeout): # no need to wait for scram user if scram is not used @@ -608,6 +615,7 @@ def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor): args.append('--if-not-exists') env = self.kafka_run_class_env() proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + log.info("PID %r args %r", proc.pid, args) stdout, stderr = proc.communicate() if proc.returncode != 0: if 'kafka.common.TopicExistsException' not in stdout: @@ -626,6 +634,7 @@ def get_topic_names(self): env = self.kafka_run_class_env() env.pop('KAFKA_LOG4J_OPTS') proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + log.info("PID %r args %r", proc.pid, args) stdout, stderr = proc.communicate() if proc.returncode != 0: self.out("Failed to list topics!") @@ -643,6 +652,7 @@ def _enrich_client_params(self, params, **defaults): for key, value in defaults.items(): params.setdefault(key, value) params.setdefault('bootstrap_servers', self.bootstrap_server()) + params.setdefault('api_version_auto_timeout_ms', KafkaClient.DEFAULT_CONFIG.get('api_version_auto_timeout_ms', 2000) * float(os.environ.get('MAX_TIMEOUT_MULTIPLIER', 1))) if self.sasl_enabled: params.setdefault('sasl_mechanism', self.sasl_mechanism) params.setdefault('security_protocol', self.transport) diff --git a/test/service.py b/test/service.py index 045d780e7..a43a651bf 100644 --- a/test/service.py +++ b/test/service.py @@ -47,9 +47,9 @@ def __init__(self, args=None, env=None): self.daemon = True log.info("Created service for command:") log.info(" "+' '.join(self.args)) - log.debug("With environment:") - for key, value in self.env.items(): - log.debug(" {key}={value}".format(key=key, value=value)) + #log.debug("With environment:") + #for key, value in self.env.items(): + # log.debug(" {key}={value}".format(key=key, value=value)) def _spawn(self): if self.alive: return @@ -62,6 +62,7 @@ def _spawn(self): bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + log.info("PID %r args %r", self.child.pid, self.args) self.alive = self.child.poll() is None def _despawn(self): @@ -108,6 +109,9 @@ def run(self): def dump_logs(self): sys.stderr.write('\n'.join(self.captured_stderr)) sys.stdout.write('\n'.join(self.captured_stdout)) + with open("/tmp/orange", "a") as orange_f: + orange_f.write('\n'.join(self.captured_stderr)) + orange_f.write('\n'.join(self.captured_stdout)) def wait_for(self, pattern, timeout=30): start = time.time() diff --git a/test/test_ssl_integration.py b/test/test_ssl_integration.py index 8453e7831..b33fc81a6 100644 --- a/test/test_ssl_integration.py +++ b/test/test_ssl_integration.py @@ -22,7 +22,7 @@ def test_admin(request, ssl_kafka): @pytest.mark.skipif(env_kafka_version() < (0, 10), reason="Inter broker SSL was implemented at version 0.9") -def test_produce_and_consume(request, ssl_kafka): +def xtest_produce_and_consume(request, ssl_kafka): topic_name = special_to_underscore(request.node.name + random_string(4)) ssl_kafka.create_topics([topic_name], num_partitions=2) producer, = ssl_kafka.get_producers(1) @@ -50,7 +50,7 @@ def test_produce_and_consume(request, ssl_kafka): @pytest.mark.skipif(env_kafka_version() < (0, 10), reason="Inter broker SSL was implemented at version 0.9") -def test_client(request, ssl_kafka): +def xtest_client(request, ssl_kafka): topic_name = special_to_underscore(request.node.name + random_string(4)) ssl_kafka.create_topics([topic_name], num_partitions=1) diff --git a/tox.ini b/tox.ini index 9d379ca1c..12b1f7006 100644 --- a/tox.ini +++ b/tox.ini @@ -30,6 +30,6 @@ setenv = CRC32C_SW_MODE = auto PROJECT_ROOT = {toxinidir} RETRY_TIMEOUT_MULTIPLIER = 8 - MAX_TIMEOUT_MULTIPLIER = 1 + MAX_TIMEOUT_MULTIPLIER = 8 passenv = KAFKA_VERSION From 3796c2d07afab6ac03855cb532df823bbadbb466 Mon Sep 17 00:00:00 2001 From: Orange Kao Date: Mon, 12 Aug 2024 06:49:51 +0000 Subject: [PATCH 03/12] mroe debug message --- test/fixtures.py | 7 +++++-- test/service.py | 25 +++++++++++++++++-------- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/test/fixtures.py b/test/fixtures.py index 8aff66c1c..31c201700 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -218,8 +218,11 @@ def open(self): self.child = SpawnedService(args, env) self.child.start() timeout = min(timeout, max(end_at - time.time(), 0)) - if self.child.wait_for(r"binding to port", timeout=timeout): - break + try: + if self.child.wait_for(r"binding to port", timeout=timeout): + break + except RuntimeError: + self.child.join() self.child.dump_logs() self.child.stop() timeout *= 2 diff --git a/test/service.py b/test/service.py index a43a651bf..86559aa30 100644 --- a/test/service.py +++ b/test/service.py @@ -55,15 +55,20 @@ def _spawn(self): if self.alive: return if self.child and self.child.poll() is None: return - self.child = subprocess.Popen( - self.args, - preexec_fn=os.setsid, # to avoid propagating signals - env=self.env, - bufsize=1, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + log.info("spawning") + try: + self.child = subprocess.Popen( + self.args, + preexec_fn=os.setsid, # to avoid propagating signals + env=self.env, + bufsize=1, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + except Exception as e: + log.warning("swapning error %r", e) log.info("PID %r args %r", self.child.pid, self.args) self.alive = self.child.poll() is None + log.info("alive is %r", self.alive) def _despawn(self): if self.child.poll() is None: @@ -83,6 +88,7 @@ def run(self): try: (rds, _, _) = select.select([self.child.stdout, self.child.stderr], [], [], 1) except select.error as ex: + log.warning("select error: %r", ex) if ex.args[0] == 4: continue else: @@ -100,10 +106,12 @@ def run(self): if self.child.poll() is not None: self.dump_logs() + log.warning("dump log and break") break if self.should_die.is_set(): self._despawn() + log.warning("despawn and break") break def dump_logs(self): @@ -117,7 +125,8 @@ def wait_for(self, pattern, timeout=30): start = time.time() while True: if not self.is_alive(): - raise RuntimeError("Child thread died already.") + self.join() + #raise RuntimeError("Child thread died already.") elapsed = time.time() - start if elapsed >= timeout: From e7e22fd865e2bc47538a95338c0bd75064cdf827 Mon Sep 17 00:00:00 2001 From: Orange Kao Date: Tue, 13 Aug 2024 03:32:06 +0000 Subject: [PATCH 04/12] debug --- test/service.py | 2 +- test/test_admin_integration.py | 23 +++++++++++++++++++++++ test/test_consumer_group.py | 2 +- tox.ini | 9 +++++---- 4 files changed, 30 insertions(+), 6 deletions(-) diff --git a/test/service.py b/test/service.py index 86559aa30..e61dbbaf0 100644 --- a/test/service.py +++ b/test/service.py @@ -126,7 +126,7 @@ def wait_for(self, pattern, timeout=30): while True: if not self.is_alive(): self.join() - #raise RuntimeError("Child thread died already.") + raise RuntimeError("Child thread died already.") elapsed = time.time() - start if elapsed >= timeout: diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 0eb06b18d..436ee8f88 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -126,7 +126,28 @@ def test_describe_configs_mixed_resources_returns_configs(topic, kafka_admin_cli ConfigResource(ConfigResourceType.BROKER, broker_id)]) assert len(configs) == 2 + retry = False + for config in configs: + assert (config.resources[0][2] == ConfigResourceType.TOPIC + and config.resources[0][3] == topic) or \ + (config.resources[0][2] == ConfigResourceType.BROKER + and config.resources[0][3] == str(broker_id)) + #assert len(config.resources[0][4]) > 1 + if not (len(config.resources[0][4]) > 1): + retry = True + + if not retry: + return + + sleep(10) + + broker_id = kafka_admin_client._client.cluster._brokers[0].nodeId + configs = kafka_admin_client.describe_configs([ + ConfigResource(ConfigResourceType.TOPIC, topic), + ConfigResource(ConfigResourceType.BROKER, broker_id)]) + + assert len(configs) == 2 for config in configs: assert (config.resources[0][2] == ConfigResourceType.TOPIC and config.resources[0][3] == topic) or \ @@ -134,6 +155,8 @@ def test_describe_configs_mixed_resources_returns_configs(topic, kafka_admin_cli and config.resources[0][3] == str(broker_id)) assert len(config.resources[0][4]) > 1 + assert False + @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11") def test_describe_configs_invalid_broker_id_raises(kafka_admin_client): diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index abd0cfe09..00de14f73 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -34,7 +34,7 @@ def test_consumer(kafka_broker, topic): def test_consumer_topics(kafka_broker, topic): consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker)) # Necessary to drive the IO - consumer.poll(500) + consumer.poll(5000) assert topic in consumer.topics() assert len(consumer.partitions_for_topic(topic)) > 0 consumer.close() diff --git a/tox.ini b/tox.ini index 12b1f7006..044f067a8 100644 --- a/tox.ini +++ b/tox.ini @@ -2,7 +2,8 @@ envlist = py312 [pytest] -testpaths = test/test_ssl_integration.py +#testpaths = test/test_ssl_integration.py +testpaths = test addopts = --durations=10 log_format = %(created)f %(filename)-23s %(threadName)s %(message)s @@ -25,11 +26,11 @@ deps = crc32c botocore commands = - pytest {posargs:--cov=kafka -vvv --tb=long --capture=sys --show-capture=all --log-level=notset --log-cli-level=notset --log-file-level=notset} + pytest {posargs:--cov=kafka -vvv -x --junitxml=junit.xml --tb=long --capture=sys --show-capture=all --log-level=notset --log-cli-level=notset --log-file-level=notset} setenv = CRC32C_SW_MODE = auto PROJECT_ROOT = {toxinidir} - RETRY_TIMEOUT_MULTIPLIER = 8 - MAX_TIMEOUT_MULTIPLIER = 8 + RETRY_TIMEOUT_MULTIPLIER = 1024 + MAX_TIMEOUT_MULTIPLIER = 1024 passenv = KAFKA_VERSION From 8fe1e7ca9cf725e2829c900c9e5270fb5dc9c52b Mon Sep 17 00:00:00 2001 From: Orange Kao Date: Tue, 13 Aug 2024 03:32:38 +0000 Subject: [PATCH 05/12] Add repeat-tox --- repeat-tox | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100755 repeat-tox diff --git a/repeat-tox b/repeat-tox new file mode 100755 index 000000000..c2fb853f1 --- /dev/null +++ b/repeat-tox @@ -0,0 +1,8 @@ +#!/bin/bash +set -e -x + +while [ true ]; do + tox + sleep 10 +done + From 77b75554f2fb787cbca48716a4ef79e522a5e2ce Mon Sep 17 00:00:00 2001 From: Orange Kao Date: Tue, 13 Aug 2024 04:09:57 +0000 Subject: [PATCH 06/12] Add scripts to setup test environment automatically --- envvar | 11 +++++++++++ first-setup-ubuntu | 19 +++++++++++++++++++ setup-after-git-clean | 12 ++++++++++++ 3 files changed, 42 insertions(+) create mode 100755 envvar create mode 100755 first-setup-ubuntu create mode 100755 setup-after-git-clean diff --git a/envvar b/envvar new file mode 100755 index 000000000..eae974156 --- /dev/null +++ b/envvar @@ -0,0 +1,11 @@ +#!/bin/bash + +if [ "$SHLVL" != "1" ] || [ "$PS1" == "" ]; then + echo "Error: Please source, not run. Example '. $(basename $0)'" + exit 1 +fi + +export KAFKA_VERSION=2.6.0 +export PATH=${HOME}/.venvs/bin:${PATH} +. travis_java_install.sh + diff --git a/first-setup-ubuntu b/first-setup-ubuntu new file mode 100755 index 000000000..6b9314846 --- /dev/null +++ b/first-setup-ubuntu @@ -0,0 +1,19 @@ +#!/bin/bash + +export KAFKA_VERSION=2.6.0 +sudo apt-get update +sudo apt-get -y install python3 python3-pip python3-venv openjdk-11-jre libsnappy-dev libzstd-dev python3-full + +mkdir -p ${HOME}/.venvs +python3 -m venv ${HOME}/.venvs +export PATH=${HOME}/.venvs/bin:${PATH} + +pip3 install build +python -m build +. travis_java_install.sh +./build_integration.sh +python -m pip install tox + +pip install . +pip install -r requirements-dev.txt + diff --git a/setup-after-git-clean b/setup-after-git-clean new file mode 100755 index 000000000..81570db12 --- /dev/null +++ b/setup-after-git-clean @@ -0,0 +1,12 @@ +#!/bin/bash + +export KAFKA_VERSION=2.6.0 +export PATH=${HOME}/.venvs/bin:${PATH} + +python -m build +. travis_java_install.sh +./build_integration.sh + +pip install . +pip install -r requirements-dev.txt + From f76b6d4f9ed89833185dd3413dd93bc03fed943b Mon Sep 17 00:00:00 2001 From: Orange Kao Date: Fri, 16 Aug 2024 00:05:15 +0000 Subject: [PATCH 07/12] use Kafka 0.8.2.2 (for 15 minutes timeout issue) --- envvar | 2 +- first-setup-ubuntu | 2 +- setup-after-git-clean | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/envvar b/envvar index eae974156..656f4114d 100755 --- a/envvar +++ b/envvar @@ -5,7 +5,7 @@ if [ "$SHLVL" != "1" ] || [ "$PS1" == "" ]; then exit 1 fi -export KAFKA_VERSION=2.6.0 +export KAFKA_VERSION=0.8.2.2 export PATH=${HOME}/.venvs/bin:${PATH} . travis_java_install.sh diff --git a/first-setup-ubuntu b/first-setup-ubuntu index 6b9314846..1fab8ee8c 100755 --- a/first-setup-ubuntu +++ b/first-setup-ubuntu @@ -1,6 +1,6 @@ #!/bin/bash -export KAFKA_VERSION=2.6.0 +export KAFKA_VERSION=0.8.2.2 sudo apt-get update sudo apt-get -y install python3 python3-pip python3-venv openjdk-11-jre libsnappy-dev libzstd-dev python3-full diff --git a/setup-after-git-clean b/setup-after-git-clean index 81570db12..9720f201f 100755 --- a/setup-after-git-clean +++ b/setup-after-git-clean @@ -1,6 +1,6 @@ #!/bin/bash -export KAFKA_VERSION=2.6.0 +export KAFKA_VERSION=0.8.2.2 export PATH=${HOME}/.venvs/bin:${PATH} python -m build From 56685ba0fc837aea0ff18cd199e5dfc31cab8344 Mon Sep 17 00:00:00 2001 From: Orange Kao Date: Fri, 16 Aug 2024 01:31:32 +0000 Subject: [PATCH 08/12] Add script to get temurin java --- java-temurin | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100755 java-temurin diff --git a/java-temurin b/java-temurin new file mode 100755 index 000000000..a8795b669 --- /dev/null +++ b/java-temurin @@ -0,0 +1,13 @@ +#!/bin/bash + +wget https://github.com/adoptium/temurin11-binaries/releases/download/jdk-11.0.12%2B7/OpenJDK11U-jdk_x64_linux_hotspot_11.0.12_7.tar.gz +tar xf OpenJDK11U-jdk_x64_linux_hotspot_11.0.12_7.tar.gz + +# wget https://github.com/adoptium/temurin8-binaries/releases/download/jdk8u422-b05/OpenJDK8U-jre_x64_linux_hotspot_8u422b05.tar.gz +# tar xf OpenJDK8U-jre_x64_linux_hotspot_8u422b05.tar.gz + +pushd . +cd jdk-11.0.12+7 +# cd jdk8u422-b05-jre/ +export PATH="${PWD}/bin:${PATH}" +popd From dc4c57e726341926ff34f478d9387e4a3f6b0ff0 Mon Sep 17 00:00:00 2001 From: Orange Kao Date: Fri, 16 Aug 2024 01:42:31 +0000 Subject: [PATCH 09/12] Revert "use Kafka 0.8.2.2 (for 15 minutes timeout issue)" This reverts commit f76b6d4f9ed89833185dd3413dd93bc03fed943b. --- envvar | 2 +- first-setup-ubuntu | 2 +- setup-after-git-clean | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/envvar b/envvar index 656f4114d..eae974156 100755 --- a/envvar +++ b/envvar @@ -5,7 +5,7 @@ if [ "$SHLVL" != "1" ] || [ "$PS1" == "" ]; then exit 1 fi -export KAFKA_VERSION=0.8.2.2 +export KAFKA_VERSION=2.6.0 export PATH=${HOME}/.venvs/bin:${PATH} . travis_java_install.sh diff --git a/first-setup-ubuntu b/first-setup-ubuntu index 1fab8ee8c..6b9314846 100755 --- a/first-setup-ubuntu +++ b/first-setup-ubuntu @@ -1,6 +1,6 @@ #!/bin/bash -export KAFKA_VERSION=0.8.2.2 +export KAFKA_VERSION=2.6.0 sudo apt-get update sudo apt-get -y install python3 python3-pip python3-venv openjdk-11-jre libsnappy-dev libzstd-dev python3-full diff --git a/setup-after-git-clean b/setup-after-git-clean index 9720f201f..81570db12 100755 --- a/setup-after-git-clean +++ b/setup-after-git-clean @@ -1,6 +1,6 @@ #!/bin/bash -export KAFKA_VERSION=0.8.2.2 +export KAFKA_VERSION=2.6.0 export PATH=${HOME}/.venvs/bin:${PATH} python -m build From f297764334396ac53b8ea80b95606299c30c4423 Mon Sep 17 00:00:00 2001 From: Orange Kao Date: Fri, 16 Aug 2024 02:09:39 +0000 Subject: [PATCH 10/12] Add kill-extra-java to verify SASL test --- kill-extra-java | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100755 kill-extra-java diff --git a/kill-extra-java b/kill-extra-java new file mode 100755 index 000000000..86c196ae1 --- /dev/null +++ b/kill-extra-java @@ -0,0 +1,13 @@ +#!/bin/bash + +while [ true ]; do + ps auxf | grep -v grep | grep kafka.pro | awk '{print $2}' > kafka-pids + kafka_count=$(cat kafka-pids | wc -l) + if [ $kafka_count -ge 2 ]; then + kafka_kill_pid=$(head -n 1 kafka-pids) + date "+%F %T " | tr -d "\n" + echo "Detected $kafka_count Kafka, killing $kafka_kill_pid" + kill $kafka_kill_pid + fi + sleep 1 +done From 1608037feb20570beb99fb972a9229f53e4fb391 Mon Sep 17 00:00:00 2001 From: Orange Kao Date: Fri, 16 Aug 2024 02:22:22 +0000 Subject: [PATCH 11/12] Try to find out why Kafka 0.8.2.2 timeout on github runner --- .github/workflows/python-package.yml | 13 +------------ tox.ini | 2 +- 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index c5bd66218..3cf960b8a 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -61,12 +61,8 @@ jobs: fail-fast: false matrix: python-version: - - "3.8" - - "3.9" - "3.10" - - "3.11" - "3.12" - - "pypy3.9" experimental: [ false ] steps: - name: Checkout the source code @@ -116,19 +112,12 @@ jobs: fail-fast: false matrix: kafka-version: - - "0.9.0.1" - - "0.10.2.2" - - "0.11.0.2" - - "0.11.0.3" - - "1.1.1" - - "2.4.0" - - "2.5.0" - "2.6.0" python-version: ['3.12'] experimental: [false] include: - kafka-version: '0.8.2.2' - experimental: true + experimental: false python-version: "3.12" - kafka-version: '0.8.2.2' experimental: false diff --git a/tox.ini b/tox.ini index 044f067a8..646943171 100644 --- a/tox.ini +++ b/tox.ini @@ -3,7 +3,7 @@ envlist = py312 [pytest] #testpaths = test/test_ssl_integration.py -testpaths = test +testpaths = test/test_partitioner.py test/test_producer.py addopts = --durations=10 log_format = %(created)f %(filename)-23s %(threadName)s %(message)s From 97887e00059da5ba45a0c0fd66e8203b6f002d2d Mon Sep 17 00:00:00 2001 From: Orange Kao Date: Fri, 16 Aug 2024 02:46:03 +0000 Subject: [PATCH 12/12] Narrow down the cause of test timeout in test/test_producer.py --- test/test_producer.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/test_producer.py b/test/test_producer.py index 15c244113..a718811e0 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -12,7 +12,7 @@ @pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12") -def test_buffer_pool(): +def xtest_buffer_pool(): pool = SimpleBufferPool(1000, 1000) buf1 = pool.allocate(1000, 1000) @@ -26,7 +26,7 @@ def test_buffer_pool(): @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") @pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12") @pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) -def test_end_to_end(kafka_broker, compression): +def xtest_end_to_end(kafka_broker, compression): if compression == 'lz4': if env_kafka_version() < (0, 8, 2): pytest.skip('LZ4 requires 0.8.2') @@ -73,7 +73,7 @@ def test_end_to_end(kafka_broker, compression): @pytest.mark.skipif(platform.python_implementation() != 'CPython', reason='Test relies on CPython-specific gc policies') @pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12") -def test_kafka_producer_gc_cleanup(): +def xtest_kafka_producer_gc_cleanup(): gc.collect() threads = threading.active_count() producer = KafkaProducer(api_version='0.9') # set api_version explicitly to avoid auto-detection @@ -85,7 +85,7 @@ def test_kafka_producer_gc_cleanup(): @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") @pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12") -@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) +@pytest.mark.parametrize("compression", ['gzip', 'snappy', 'lz4', 'zstd']) def test_kafka_producer_proper_record_metadata(kafka_broker, compression): if compression == 'zstd' and env_kafka_version() < (2, 1, 0): pytest.skip('zstd requires 2.1.0 or more')