diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index c5bd66218..3cf960b8a 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -61,12 +61,8 @@ jobs: fail-fast: false matrix: python-version: - - "3.8" - - "3.9" - "3.10" - - "3.11" - "3.12" - - "pypy3.9" experimental: [ false ] steps: - name: Checkout the source code @@ -116,19 +112,12 @@ jobs: fail-fast: false matrix: kafka-version: - - "0.9.0.1" - - "0.10.2.2" - - "0.11.0.2" - - "0.11.0.3" - - "1.1.1" - - "2.4.0" - - "2.5.0" - "2.6.0" python-version: ['3.12'] experimental: [false] include: - kafka-version: '0.8.2.2' - experimental: true + experimental: false python-version: "3.12" - kafka-version: '0.8.2.2' experimental: false diff --git a/envvar b/envvar new file mode 100755 index 000000000..eae974156 --- /dev/null +++ b/envvar @@ -0,0 +1,11 @@ +#!/bin/bash + +if [ "$SHLVL" != "1" ] || [ "$PS1" == "" ]; then + echo "Error: Please source, not run. Example '. $(basename $0)'" + exit 1 +fi + +export KAFKA_VERSION=2.6.0 +export PATH=${HOME}/.venvs/bin:${PATH} +. travis_java_install.sh + diff --git a/first-setup-ubuntu b/first-setup-ubuntu new file mode 100755 index 000000000..6b9314846 --- /dev/null +++ b/first-setup-ubuntu @@ -0,0 +1,19 @@ +#!/bin/bash + +export KAFKA_VERSION=2.6.0 +sudo apt-get update +sudo apt-get -y install python3 python3-pip python3-venv openjdk-11-jre libsnappy-dev libzstd-dev python3-full + +mkdir -p ${HOME}/.venvs +python3 -m venv ${HOME}/.venvs +export PATH=${HOME}/.venvs/bin:${PATH} + +pip3 install build +python -m build +. travis_java_install.sh +./build_integration.sh +python -m pip install tox + +pip install . +pip install -r requirements-dev.txt + diff --git a/java-temurin b/java-temurin new file mode 100755 index 000000000..a8795b669 --- /dev/null +++ b/java-temurin @@ -0,0 +1,13 @@ +#!/bin/bash + +wget https://github.com/adoptium/temurin11-binaries/releases/download/jdk-11.0.12%2B7/OpenJDK11U-jdk_x64_linux_hotspot_11.0.12_7.tar.gz +tar xf OpenJDK11U-jdk_x64_linux_hotspot_11.0.12_7.tar.gz + +# wget https://github.com/adoptium/temurin8-binaries/releases/download/jdk8u422-b05/OpenJDK8U-jre_x64_linux_hotspot_8u422b05.tar.gz +# tar xf OpenJDK8U-jre_x64_linux_hotspot_8u422b05.tar.gz + +pushd . +cd jdk-11.0.12+7 +# cd jdk8u422-b05-jre/ +export PATH="${PWD}/bin:${PATH}" +popd diff --git a/kafka/client_async.py b/kafka/client_async.py index 984cd81fb..2dc6a1f56 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -754,6 +754,7 @@ def least_loaded_node(self): node_id or None if no suitable node was found """ nodes = [broker.nodeId for broker in self.cluster.brokers()] + log.debug("least_loaded_node %r", nodes) random.shuffle(nodes) inflight = float('inf') diff --git a/kill-extra-java b/kill-extra-java new file mode 100755 index 000000000..86c196ae1 --- /dev/null +++ b/kill-extra-java @@ -0,0 +1,13 @@ +#!/bin/bash + +while [ true ]; do + ps auxf | grep -v grep | grep kafka.pro | awk '{print $2}' > kafka-pids + kafka_count=$(cat kafka-pids | wc -l) + if [ $kafka_count -ge 2 ]; then + kafka_kill_pid=$(head -n 1 kafka-pids) + date "+%F %T " | tr -d "\n" + echo "Detected $kafka_count Kafka, killing $kafka_kill_pid" + kill $kafka_kill_pid + fi + sleep 1 +done diff --git a/repeat-tox b/repeat-tox new file mode 100755 index 000000000..c2fb853f1 --- /dev/null +++ b/repeat-tox @@ -0,0 +1,8 @@ +#!/bin/bash +set -e -x + +while [ true ]; do + tox + sleep 10 +done + diff --git a/setup-after-git-clean b/setup-after-git-clean new file mode 100755 index 000000000..81570db12 --- /dev/null +++ b/setup-after-git-clean @@ -0,0 +1,12 @@ +#!/bin/bash + +export KAFKA_VERSION=2.6.0 +export PATH=${HOME}/.venvs/bin:${PATH} + +python -m build +. travis_java_install.sh +./build_integration.sh + +pip install . +pip install -r requirements-dev.txt + diff --git a/test/fixtures.py b/test/fixtures.py index 998dc429f..31c201700 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -141,16 +141,16 @@ def render_template(cls, source_file, target_file, binding): dirfd = os.open(os.path.dirname(target_file.strpath), os.O_DIRECTORY) os.fsync(dirfd) os.close(dirfd) - log.debug("Template string:") - for line in template.splitlines(): - log.debug(' ' + line.strip()) - log.debug("Rendered template:") - with open(target_file.strpath, 'r') as o: - for line in o: - log.debug(' ' + line.strip()) - log.debug("binding:") - for key, value in binding.items(): - log.debug(" {key}={value}".format(key=key, value=value)) + #log.debug("Template string:") + #for line in template.splitlines(): + # log.debug(' ' + line.strip()) + #log.debug("Rendered template:") + #with open(target_file.strpath, 'r') as o: + # for line in o: + # log.debug(' ' + line.strip()) + #log.debug("binding:") + #for key, value in binding.items(): + # log.debug(" {key}={value}".format(key=key, value=value)) def dump_logs(self): self.child.dump_logs() @@ -203,12 +203,13 @@ def open(self): env = self.kafka_run_class_env() # Party! - timeout = 5 - max_timeout = 120 + timeout = 5 * float(os.environ.get('RETRY_TIMEOUT_MULTIPLIER', 1)) + max_timeout = 120 * float(os.environ.get('MAX_TIMEOUT_MULTIPLIER', 1)) backoff = 1 end_at = time.time() + max_timeout tries = 1 auto_port = (self.port is None) + orange_start = time.monotonic() while time.time() < end_at: if auto_port: self.port = get_open_port() @@ -217,8 +218,11 @@ def open(self): self.child = SpawnedService(args, env) self.child.start() timeout = min(timeout, max(end_at - time.time(), 0)) - if self.child.wait_for(r"binding to port", timeout=timeout): - break + try: + if self.child.wait_for(r"binding to port", timeout=timeout): + break + except RuntimeError: + self.child.join() self.child.dump_logs() self.child.stop() timeout *= 2 @@ -227,6 +231,8 @@ def open(self): backoff += 1 else: raise RuntimeError('Failed to start Zookeeper before max_timeout') + with open("/tmp/orange", "a") as orange_f: + orange_f.write("open " + str(time.monotonic() - orange_start) + "\n") self.out("Done!") atexit.register(self.close) @@ -358,6 +364,7 @@ def _add_scram_user(self): ) env = self.kafka_run_class_env() proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + log.info("PID %r args %r", proc.pid, args) stdout, stderr = proc.communicate() @@ -394,6 +401,7 @@ def _create_zk_chroot(self): "kafka-python") env = self.kafka_run_class_env() proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + log.info("PID %r args %r", proc.pid, args) stdout, stderr = proc.communicate() @@ -421,12 +429,13 @@ def start(self): env['KAFKA_OPTS'] = opts self.render_template(jaas_conf_template, jaas_conf, vars(self)) - timeout = 5 - max_timeout = 120 + timeout = 5 * float(os.environ.get('RETRY_TIMEOUT_MULTIPLIER', 1)) + max_timeout = 120 * float(os.environ.get('MAX_TIMEOUT_MULTIPLIER', 1)) backoff = 1 end_at = time.time() + max_timeout tries = 1 auto_port = (self.port is None) + orange_start = time.monotonic() while time.time() < end_at: # We have had problems with port conflicts on travis # so we will try a different port on each retry @@ -451,6 +460,8 @@ def start(self): backoff += 1 else: raise RuntimeError('Failed to start KafkaInstance before max_timeout') + with open("/tmp/orange", "a") as orange_f: + orange_f.write("start " + str(time.monotonic() - orange_start) + "\n") (self._client,) = self.get_clients(1, client_id='_internal_client') @@ -458,7 +469,12 @@ def start(self): self.running = True def _broker_ready(self, timeout): - return self.child.wait_for(self.start_pattern, timeout=timeout) + #return self.child.wait_for(self.start_pattern, timeout=timeout) + try: + orange_w = self.child.wait_for(self.start_pattern, timeout=timeout) + return orange_w + except RuntimeError: + self.child.join() def _scram_user_present(self, timeout): # no need to wait for scram user if scram is not used @@ -602,6 +618,7 @@ def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor): args.append('--if-not-exists') env = self.kafka_run_class_env() proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + log.info("PID %r args %r", proc.pid, args) stdout, stderr = proc.communicate() if proc.returncode != 0: if 'kafka.common.TopicExistsException' not in stdout: @@ -620,6 +637,7 @@ def get_topic_names(self): env = self.kafka_run_class_env() env.pop('KAFKA_LOG4J_OPTS') proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + log.info("PID %r args %r", proc.pid, args) stdout, stderr = proc.communicate() if proc.returncode != 0: self.out("Failed to list topics!") @@ -637,6 +655,7 @@ def _enrich_client_params(self, params, **defaults): for key, value in defaults.items(): params.setdefault(key, value) params.setdefault('bootstrap_servers', self.bootstrap_server()) + params.setdefault('api_version_auto_timeout_ms', KafkaClient.DEFAULT_CONFIG.get('api_version_auto_timeout_ms', 2000) * float(os.environ.get('MAX_TIMEOUT_MULTIPLIER', 1))) if self.sasl_enabled: params.setdefault('sasl_mechanism', self.sasl_mechanism) params.setdefault('security_protocol', self.transport) diff --git a/test/service.py b/test/service.py index 045d780e7..e61dbbaf0 100644 --- a/test/service.py +++ b/test/service.py @@ -47,22 +47,28 @@ def __init__(self, args=None, env=None): self.daemon = True log.info("Created service for command:") log.info(" "+' '.join(self.args)) - log.debug("With environment:") - for key, value in self.env.items(): - log.debug(" {key}={value}".format(key=key, value=value)) + #log.debug("With environment:") + #for key, value in self.env.items(): + # log.debug(" {key}={value}".format(key=key, value=value)) def _spawn(self): if self.alive: return if self.child and self.child.poll() is None: return - self.child = subprocess.Popen( - self.args, - preexec_fn=os.setsid, # to avoid propagating signals - env=self.env, - bufsize=1, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + log.info("spawning") + try: + self.child = subprocess.Popen( + self.args, + preexec_fn=os.setsid, # to avoid propagating signals + env=self.env, + bufsize=1, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + except Exception as e: + log.warning("swapning error %r", e) + log.info("PID %r args %r", self.child.pid, self.args) self.alive = self.child.poll() is None + log.info("alive is %r", self.alive) def _despawn(self): if self.child.poll() is None: @@ -82,6 +88,7 @@ def run(self): try: (rds, _, _) = select.select([self.child.stdout, self.child.stderr], [], [], 1) except select.error as ex: + log.warning("select error: %r", ex) if ex.args[0] == 4: continue else: @@ -99,20 +106,26 @@ def run(self): if self.child.poll() is not None: self.dump_logs() + log.warning("dump log and break") break if self.should_die.is_set(): self._despawn() + log.warning("despawn and break") break def dump_logs(self): sys.stderr.write('\n'.join(self.captured_stderr)) sys.stdout.write('\n'.join(self.captured_stdout)) + with open("/tmp/orange", "a") as orange_f: + orange_f.write('\n'.join(self.captured_stderr)) + orange_f.write('\n'.join(self.captured_stdout)) def wait_for(self, pattern, timeout=30): start = time.time() while True: if not self.is_alive(): + self.join() raise RuntimeError("Child thread died already.") elapsed = time.time() - start diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 0eb06b18d..436ee8f88 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -126,7 +126,28 @@ def test_describe_configs_mixed_resources_returns_configs(topic, kafka_admin_cli ConfigResource(ConfigResourceType.BROKER, broker_id)]) assert len(configs) == 2 + retry = False + for config in configs: + assert (config.resources[0][2] == ConfigResourceType.TOPIC + and config.resources[0][3] == topic) or \ + (config.resources[0][2] == ConfigResourceType.BROKER + and config.resources[0][3] == str(broker_id)) + #assert len(config.resources[0][4]) > 1 + if not (len(config.resources[0][4]) > 1): + retry = True + + if not retry: + return + + sleep(10) + + broker_id = kafka_admin_client._client.cluster._brokers[0].nodeId + configs = kafka_admin_client.describe_configs([ + ConfigResource(ConfigResourceType.TOPIC, topic), + ConfigResource(ConfigResourceType.BROKER, broker_id)]) + + assert len(configs) == 2 for config in configs: assert (config.resources[0][2] == ConfigResourceType.TOPIC and config.resources[0][3] == topic) or \ @@ -134,6 +155,8 @@ def test_describe_configs_mixed_resources_returns_configs(topic, kafka_admin_cli and config.resources[0][3] == str(broker_id)) assert len(config.resources[0][4]) > 1 + assert False + @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11") def test_describe_configs_invalid_broker_id_raises(kafka_admin_client): diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index abd0cfe09..00de14f73 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -34,7 +34,7 @@ def test_consumer(kafka_broker, topic): def test_consumer_topics(kafka_broker, topic): consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker)) # Necessary to drive the IO - consumer.poll(500) + consumer.poll(5000) assert topic in consumer.topics() assert len(consumer.partitions_for_topic(topic)) > 0 consumer.close() diff --git a/test/test_producer.py b/test/test_producer.py index 15c244113..a718811e0 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -12,7 +12,7 @@ @pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12") -def test_buffer_pool(): +def xtest_buffer_pool(): pool = SimpleBufferPool(1000, 1000) buf1 = pool.allocate(1000, 1000) @@ -26,7 +26,7 @@ def test_buffer_pool(): @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") @pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12") @pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) -def test_end_to_end(kafka_broker, compression): +def xtest_end_to_end(kafka_broker, compression): if compression == 'lz4': if env_kafka_version() < (0, 8, 2): pytest.skip('LZ4 requires 0.8.2') @@ -73,7 +73,7 @@ def test_end_to_end(kafka_broker, compression): @pytest.mark.skipif(platform.python_implementation() != 'CPython', reason='Test relies on CPython-specific gc policies') @pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12") -def test_kafka_producer_gc_cleanup(): +def xtest_kafka_producer_gc_cleanup(): gc.collect() threads = threading.active_count() producer = KafkaProducer(api_version='0.9') # set api_version explicitly to avoid auto-detection @@ -85,7 +85,7 @@ def test_kafka_producer_gc_cleanup(): @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") @pytest.mark.skipif(env_kafka_version() <= (0, 8, 2) and sys.version_info > (3, 11), reason="Kafka 0.8.2 and earlier not supported by 3.12") -@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) +@pytest.mark.parametrize("compression", ['gzip', 'snappy', 'lz4', 'zstd']) def test_kafka_producer_proper_record_metadata(kafka_broker, compression): if compression == 'zstd' and env_kafka_version() < (2, 1, 0): pytest.skip('zstd requires 2.1.0 or more') diff --git a/test/test_ssl_integration.py b/test/test_ssl_integration.py index 8453e7831..b33fc81a6 100644 --- a/test/test_ssl_integration.py +++ b/test/test_ssl_integration.py @@ -22,7 +22,7 @@ def test_admin(request, ssl_kafka): @pytest.mark.skipif(env_kafka_version() < (0, 10), reason="Inter broker SSL was implemented at version 0.9") -def test_produce_and_consume(request, ssl_kafka): +def xtest_produce_and_consume(request, ssl_kafka): topic_name = special_to_underscore(request.node.name + random_string(4)) ssl_kafka.create_topics([topic_name], num_partitions=2) producer, = ssl_kafka.get_producers(1) @@ -50,7 +50,7 @@ def test_produce_and_consume(request, ssl_kafka): @pytest.mark.skipif(env_kafka_version() < (0, 10), reason="Inter broker SSL was implemented at version 0.9") -def test_client(request, ssl_kafka): +def xtest_client(request, ssl_kafka): topic_name = special_to_underscore(request.node.name + random_string(4)) ssl_kafka.create_topics([topic_name], num_partitions=1) diff --git a/tox.ini b/tox.ini index a574dc136..646943171 100644 --- a/tox.ini +++ b/tox.ini @@ -1,19 +1,15 @@ [tox] -envlist = py{38,39,310,311,312,py}, docs +envlist = py312 [pytest] -testpaths = kafka test +#testpaths = test/test_ssl_integration.py +testpaths = test/test_partitioner.py test/test_producer.py addopts = --durations=10 log_format = %(created)f %(filename)-23s %(threadName)s %(message)s [gh-actions] python = - 3.8: py38 - 3.9: py39 - 3.10: py310 - 3.11: py311 3.12: py312 - pypy-3.9: pypy [testenv] deps = @@ -30,22 +26,11 @@ deps = crc32c botocore commands = - pytest {posargs:--pylint --pylint-rcfile=pylint.rc --pylint-error-types=EF --cov=kafka} + pytest {posargs:--cov=kafka -vvv -x --junitxml=junit.xml --tb=long --capture=sys --show-capture=all --log-level=notset --log-cli-level=notset --log-file-level=notset} setenv = CRC32C_SW_MODE = auto PROJECT_ROOT = {toxinidir} + RETRY_TIMEOUT_MULTIPLIER = 1024 + MAX_TIMEOUT_MULTIPLIER = 1024 passenv = KAFKA_VERSION - -[testenv:pypy] -# pylint is super slow on pypy... -commands = pytest {posargs:--cov=kafka} - -[testenv:docs] -deps = - sphinx_rtd_theme - sphinx - -commands = - sphinx-apidoc -o docs/apidoc/ kafka/ - sphinx-build -b html docs/ docs/_build