Skip to content

Commit

Permalink
Bump Kafka version and switch to EmbeddedKafka
Browse files Browse the repository at this point in the history
  • Loading branch information
pbernet committed Jul 28, 2024
1 parent 743c013 commit ea4ed73
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 52 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ val pekkoHTTPVersion = "1.1.0-M1"
val pekkoConnectorVersion = "1.0.2"
val pekkoConnectorKafkaVersion = "1.0.0"

val kafkaVersion = "3.6.1"
val kafkaVersion = "3.7.0"
val activemqVersion = "5.18.4" // We are stuck with 5.x
val artemisVersion = "2.35.0"
val testContainersVersion = "1.19.8"
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ services:
- runtime-net

broker:
image: confluentinc/cp-kafka
image: confluentinc/cp-kafka:7.7.0
hostname: broker
depends_on:
- zookeeper
Expand Down
6 changes: 2 additions & 4 deletions src/main/scala/alpakka/env/KafkaServerTestcontainers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.testcontainers.utility.DockerImageName
/**
* Uses testcontainers.org to run the
* latest Kafka-Version from Confluent
* See also Kafka broker from: /docker/docker-compose.yml
*
* Alternative: [[KafkaServerEmbedded]]
*
Expand All @@ -17,10 +18,7 @@ import org.testcontainers.utility.DockerImageName
*/
class KafkaServerTestcontainers {
val logger: Logger = LoggerFactory.getLogger(this.getClass)
// Pin cp-kafka version for now, because 'latest' does not work on github actions anymore
// https://hub.docker.com/r/confluentinc/cp-kafka
// https://docs.confluent.io/platform/current/installation/versions-interoperability.html#cp-and-apache-kafka-compatibility
val kafkaVersion = "7.5.3"
val kafkaVersion = "7.7.0"
val imageName = s"confluentinc/cp-kafka:$kafkaVersion"
val originalPort = 9093
var mappedPort = 1111
Expand Down
25 changes: 0 additions & 25 deletions src/main/scala/alpakka/kafka/DeleteTopicUtil.scala

This file was deleted.

40 changes: 20 additions & 20 deletions src/test/scala/alpakka/tcp_to_websockets/AlpakkaTrophySpec.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package alpakka.tcp_to_websockets

import alpakka.env.{KafkaServerTestcontainers, WebsocketServer}
import alpakka.env.WebsocketServer
import alpakka.tcp_to_websockets.hl7mllp.{Hl7Tcp2Kafka, Hl7TcpClient}
import alpakka.tcp_to_websockets.websockets.{Kafka2SSE, Kafka2Websocket}
import io.github.embeddedkafka.EmbeddedKafka
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AsyncWordSpec
import org.scalatest.{BeforeAndAfterEachTestData, TestData}
Expand All @@ -17,16 +18,13 @@ import util.LogFileScanner
*
* Remarks:
* - The test focus is on log file scanning to check for processed messages and ERRORs
* - This setup restarts Kafka for each test, so they can run independently. The downside
* of this is that we have to deal with a new mapped port on each restart.
* A setup with one Kafka start for all tests is here:
* https://doc.akka.io/docs/alpakka-kafka/current/testing-testcontainers.html
* - This test restarts Kafka for each test, so they can run independently.
* - Since the shutdown of producers/consumers takes a long time, there are WARN msgs in the log
*/
final class AlpakkaTrophySpec extends AsyncWordSpec with Matchers with BeforeAndAfterEachTestData {
val logger: Logger = LoggerFactory.getLogger(this.getClass)

val kafkaContainer: KafkaServerTestcontainers = KafkaServerTestcontainers()
private var bootstrapServer: String = _
var mappedPortKafka: Int = _

var websocketServer: WebsocketServer = _
Expand All @@ -39,7 +37,8 @@ final class AlpakkaTrophySpec extends AsyncWordSpec with Matchers with BeforeAnd
val numberOfMessages = 10
Hl7TcpClient(numberOfMessages)

new LogFileScanner().run(10, 10, "Starting test: Happy path should find all processed messages in WebsocketServer log", "ERROR").length should equal(0)
// With EmbeddedKafka there is one ERROR due to port binding at the start
new LogFileScanner().run(10, 10, "Starting test: Happy path should find all processed messages in WebsocketServer log", "ERROR").length should equal(1)
// 10 + 1 Initial message
new LogFileScanner().run(10, 10, "Starting test: Happy path should find all processed messages in WebsocketServer log", "WebsocketServer received:").length should equal(numberOfMessages + 1)
}
Expand Down Expand Up @@ -103,24 +102,24 @@ final class AlpakkaTrophySpec extends AsyncWordSpec with Matchers with BeforeAnd

// Stopping after half of the msg are processed
Thread.sleep(5000)
logger.info("Re-starting Kafka container...")
kafkaContainer.stop()
kafkaContainer.run()
val newMappedPortKafka = kafkaContainer.mappedPort
logger.info(s"Re-started Kafka on new mapped port: $newMappedPortKafka")
logger.info("Re-starting Kafka...")
EmbeddedKafka.stop()
mappedPortKafka = EmbeddedKafka.start().config.kafkaPort
bootstrapServer = s"localhost:$mappedPortKafka"
logger.info(s"Re-started Kafka on mapped port: $mappedPortKafka")

// Now we need to restart the components sending/receiving to/from Kafka as well,
// to connect to the new mapped port
hl7Tcp2Kafka.stop()
hl7Tcp2Kafka = Hl7Tcp2Kafka(newMappedPortKafka)
hl7Tcp2Kafka = Hl7Tcp2Kafka(mappedPortKafka)
hl7Tcp2Kafka.run()

kafka2Websocket.stop()
kafka2Websocket = Kafka2Websocket(newMappedPortKafka)
kafka2Websocket = Kafka2Websocket(mappedPortKafka)
kafka2Websocket.run()

kafka2SSE.stop()
kafka2SSE = Kafka2SSE(newMappedPortKafka)
kafka2SSE = Kafka2SSE(mappedPortKafka)
kafka2SSE.run()

// 10 + 1 Initial message
Expand All @@ -132,9 +131,10 @@ final class AlpakkaTrophySpec extends AsyncWordSpec with Matchers with BeforeAnd
// Write start indicator for the LogFileScanner
logger.info(s"Starting test: ${testData.name}")

logger.info("Starting Kafka container...")
kafkaContainer.run()
mappedPortKafka = kafkaContainer.mappedPort
logger.info("Starting Kafka...")
mappedPortKafka = EmbeddedKafka.start().config.kafkaPort
bootstrapServer = s"localhost:$mappedPortKafka"

logger.info(s"Running Kafka on mapped port: $mappedPortKafka")

// Start other components
Expand All @@ -152,8 +152,8 @@ final class AlpakkaTrophySpec extends AsyncWordSpec with Matchers with BeforeAnd
}

override protected def afterEach(testData: TestData): Unit = {
logger.info("Stopping Kafka container...")
kafkaContainer.stop()
logger.info("Stopping Kafka...")
EmbeddedKafka.stop()
logger.info("Stopping other components...")
websocketServer.stop()
hl7Tcp2Kafka.stop()
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/util/LogFileScanner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class LogFileScanner(localLogFilePath: String = "logs/application.log") {
def run(scanDelaySeconds: Int = 0, scanForSeconds: Int = 5, searchAfterPattern: String, pattern: String): List[String] = {
val path: Path = fs.getPath(localLogFilePath)
val pollingInterval = 250.millis
val maxLineSize: Int = 24 * 1024
val maxLineSize: Int = 100 * 1024

// Wait for the components to produce log messages
Thread.sleep(scanDelaySeconds * 1000)
Expand Down

0 comments on commit ea4ed73

Please sign in to comment.