Skip to content

Commit

Permalink
[#3558] Use Kafka in Raft mode for running integration tests
Browse files Browse the repository at this point in the history
Also updated Kafka container image used for ITs to version 3.5.0.

Fixes #3558
  • Loading branch information
sophokles73 committed Oct 29, 2023
1 parent 6fc41a1 commit 98e1ab2
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 43 deletions.
3 changes: 1 addition & 2 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
<javax.transaction-api.version>1.3</javax.transaction-api.version>
<java-base-image.name>docker.io/library/eclipse-temurin:17-jre-jammy</java-base-image.name>
<jjwt.version>0.11.5</jjwt.version>
<kafka.image.name>docker.io/confluentinc/cp-kafka:7.3.5</kafka.image.name>
<kafka.image.name>docker.io/confluentinc/cp-kafka:7.5.0</kafka.image.name>
<logback.version>1.4.11</logback.version>
<mongodb-image.name>docker.io/library/mongo:6.0</mongodb-image.name>
<native.image.name>quay.io/quarkus/quarkus-micro-image:2.0</native.image.name>
Expand All @@ -51,7 +51,6 @@
<slf4j.version>2.0.6</slf4j.version>
<spring-security-crypto.version>6.1.4</spring-security-crypto.version>
<truth.version>1.1.3</truth.version>
<zookeeper.image.name>docker.io/confluentinc/cp-zookeeper:7.3.5</zookeeper.image.name>

<!-- The port at which the health check server should expose its resources -->
<health.check.port>8088</health.check.port>
Expand Down
2 changes: 2 additions & 0 deletions site/homepage/content/release-notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ description = "Information about changes in recent Hono releases. Includes new f
* When using Pub/Sub messaging, there were potentially issues concerning the AMQP connection between protocol adapter
and command router, leading for example to timeouts when MQTT devices subscribed/unsubscribed to the command topic.
This has been fixed.
* The integration tests now use Apache Kafka 3.5.0 in Raft mode which no longer requires running a separate Apache Zookeeper
instance and thus simplifies test setup and configuration.

### Deprecations

Expand Down
53 changes: 14 additions & 39 deletions tests/pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2016, 2022 Contributors to the Eclipse Foundation
Copyright (c) 2016, 2023 Contributors to the Eclipse Foundation
See the NOTICE file(s) distributed with this work for additional
information regarding copyright ownership.
Expand Down Expand Up @@ -1211,34 +1211,6 @@
</env>
</run>
</image>
<!-- ##### Zookeeper ##### -->
<image>
<name>${zookeeper.image.name}</name>
<alias>hono-zookeeper-test</alias>
<run>
<skip>${hono.kafka-messaging.disabled}</skip>
<network>
<mode>custom</mode>
<name>${custom.network.name}</name>
<alias>zookeeper</alias>
</network>
<memorySwap>400000000</memorySwap>
<memory>400000000</memory>
<log>
<prefix>Zookeeper</prefix>
<color>${log.color.kafka}</color>
</log>
<wait>
<time>${service.startup.timeout}</time>
<log>.*(binding to port).*</log>
</wait>
<env>
<KAFKA_HEAP_OPTS>-Xms240M -Xmx240M</KAFKA_HEAP_OPTS>
<ZOOKEEPER_CLIENT_PORT>2181</ZOOKEEPER_CLIENT_PORT>
<ZOOKEEPER_TICK_TIME>2000</ZOOKEEPER_TICK_TIME>
</env>
</run>
</image>
<!-- ##### Kafka ##### -->
<image>
<name>${kafka.image.name}</name>
Expand All @@ -1261,25 +1233,28 @@
</log>
<wait>
<time>${service.startup.timeout}</time>
<log>.*(\[KafkaServer id=1\] started).*</log>
<log>.*(Kafka startTimeMs:).*</log>
</wait>
<env>
<CLUSTER_ID>q1Sh-9_ISia_zwGINzRvyQ</CLUSTER_ID>
<KAFKA_ADVERTISED_LISTENERS>DOCKER_INTERNAL://kafka:9092,DOCKER_EXTERNAL://${docker.host.address}:${kafka.port}</KAFKA_ADVERTISED_LISTENERS>
<KAFKA_BROKER_ID>1</KAFKA_BROKER_ID>
<KAFKA_CONTROLLER_LISTENER_NAMES>CONTROLLER</KAFKA_CONTROLLER_LISTENER_NAMES>
<KAFKA_CONTROLLER_QUORUM_VOTERS>1@127.0.0.1:9093</KAFKA_CONTROLLER_QUORUM_VOTERS>
<KAFKA_NODE_ID>1</KAFKA_NODE_ID>
<KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS>0</KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS>
<KAFKA_HEAP_OPTS>-Xms1024m -Xmx1024m</KAFKA_HEAP_OPTS>
<KAFKA_INTER_BROKER_LISTENER_NAME>DOCKER_INTERNAL</KAFKA_INTER_BROKER_LISTENER_NAME>
<KAFKA_JVM_PERFORMANCE_OPTS>-server -XX:+UseG1GC -XX:MaxGCPauseMillis=40 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true</KAFKA_JVM_PERFORMANCE_OPTS>
<KAFKA_LISTENERS>DOCKER_INTERNAL://0.0.0.0:9092,DOCKER_EXTERNAL://0.0.0.0:9094</KAFKA_LISTENERS>
<KAFKA_LISTENER_SECURITY_PROTOCOL_MAP>DOCKER_INTERNAL:PLAINTEXT,DOCKER_EXTERNAL:PLAINTEXT</KAFKA_LISTENER_SECURITY_PROTOCOL_MAP>
<KAFKA_LISTENERS>DOCKER_INTERNAL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093,DOCKER_EXTERNAL://0.0.0.0:9094</KAFKA_LISTENERS>
<KAFKA_LISTENER_SECURITY_PROTOCOL_MAP>CONTROLLER:PLAINTEXT,DOCKER_INTERNAL:PLAINTEXT,DOCKER_EXTERNAL:PLAINTEXT</KAFKA_LISTENER_SECURITY_PROTOCOL_MAP>
<!-- increase log cleaner check frequency (default is 5min) for test where records shall be removed -->
<KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS>1100</KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS>
<!-- log level for kafka.server.KafkaServer needs to stay on INFO so that docker-maven-plugin can wait for container startup -->
<KAFKA_LOG4J_LOGGERS>kafka.cluster=${hono.kafka.log-level},kafka.controller=${hono.kafka.log-level},kafka.coordinator=${hono.kafka.log-level},kafka.log=${hono.kafka.log-level},kafka.authorizer=${hono.kafka.log-level},kafka.zk=${hono.kafka.log-level},state.change.logger=${hono.kafka.log-level},kafka.server=${hono.kafka.log-level},kafka.server.KafkaServer=INFO</KAFKA_LOG4J_LOGGERS>
<KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS>1000</KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS>
<KAFKA_LOG_SEGMENT_DELETE_DELAY_MS>0</KAFKA_LOG_SEGMENT_DELETE_DELAY_MS>
<!-- log level for AppInfoParser needs to be pinned to INFO so that docker-maven-plugin can wait for container startup -->
<KAFKA_LOG4J_LOGGERS>kafka=${hono.kafka.log-level},org.apache.kafka=${hono.kafka.log-level},state.change.logger=${hono.kafka.log-level},org.apache.kafka.common.utils.AppInfoParser=INFO</KAFKA_LOG4J_LOGGERS>
<KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR>1</KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR>
<KAFKA_PROCESS_ROLES>broker,controller</KAFKA_PROCESS_ROLES>
<KAFKA_REST_BOOTSTRAP_SERVERS>127.0.0.1:9092</KAFKA_REST_BOOTSTRAP_SERVERS>
<KAFKA_TRANSACTION_STATE_LOG_MIN_ISR>1</KAFKA_TRANSACTION_STATE_LOG_MIN_ISR>
<KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR>1</KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR>
<KAFKA_ZOOKEEPER_CONNECT>zookeeper:2181</KAFKA_ZOOKEEPER_CONNECT>
</env>
</run>
</image>
Expand Down
1 change: 0 additions & 1 deletion tests/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ This starts the following Docker containers and runs the test cases against them
* MongoDB server
* Hono Authentication service
* Hono Device Registration service
* Apache Zookeeper
* Apache Kafka
* Hono Command Router service
* Hono HTTP adapter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ private Future<ProtonReceiver> subscribeToCommands(

final Promise<ProtonReceiver> result = Promise.promise();
context.runOnContext(go -> {
log.debug("creating command consumer for tenant [{}]", tenantId);
final ProtonReceiver recv = connection.createReceiver(endpointConfig.getSubscriptionAddress(
tenantId, commandTargetDeviceId));
recv.setAutoAccept(false);
Expand Down Expand Up @@ -321,7 +322,10 @@ public void testSendAsyncCommandsSucceeds(
: deviceId;

final int totalNoOfCommandsToSend = 60;
connectAndSubscribe(ctx, commandTargetDeviceId, endpointConfig,
connectAndSubscribe(
ctx,
commandTargetDeviceId,
endpointConfig,
(cmdReceiver, cmdResponseSender) -> createCommandConsumer(ctx, cmdReceiver, cmdResponseSender),
totalNoOfCommandsToSend);
if (ctx.failed()) {
Expand Down

0 comments on commit 98e1ab2

Please sign in to comment.