From 0a874f754cd4f14616e71545f22e7e2d7284235f Mon Sep 17 00:00:00 2001 From: Z1kkurat Date: Tue, 4 Jul 2023 10:30:20 +0200 Subject: [PATCH] Use testcontainers-kafka for integration tests --- build.sbt | 3 +-- project/Dependencies.scala | 23 ++++++++-------- .../skafka/ProducerConsumerSpec.scala | 27 ++++++++++++++----- 3 files changed, 32 insertions(+), 21 deletions(-) diff --git a/build.sbt b/build.sbt index a9b2b6a8..7c85f9d6 100644 --- a/build.sbt +++ b/build.sbt @@ -82,8 +82,7 @@ lazy val tests = (project in file("tests") settings Seq(publish / skip := true, Test / fork := true, Test / parallelExecution := false) dependsOn skafka % "compile->compile;test->test" settings (libraryDependencies ++= Seq( - Kafka.kafka % Test, - `kafka-launcher` % Test, + `testcontainers-kafka` % Test, Slf4j.api % Test, Slf4j.`log4j-over-slf4j` % Test, Logback.core % Test, diff --git a/project/Dependencies.scala b/project/Dependencies.scala index a8d25785..9061f687 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -2,21 +2,20 @@ import sbt._ object Dependencies { - val `executor-tools` = "com.evolutiongaming" %% "executor-tools" % "1.0.2" - val `config-tools` = "com.evolutiongaming" %% "config-tools" % "1.0.4" - val `future-helper` = "com.evolutiongaming" %% "future-helper" % "1.0.6" - val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "3.5.0" - val `kafka-launcher` = "com.evolutiongaming" %% "kafka-launcher" % "0.1.0" - val `play-json-jsoniter` = "com.evolutiongaming" %% "play-json-jsoniter" % "0.10.0" - val `scala-java8-compat` = "org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2" - val `collection-compat` = "org.scala-lang.modules" %% "scala-collection-compat" % "2.8.1" - val scalatest = "org.scalatest" %% "scalatest" % "3.2.13" - val `kind-projector` = "org.typelevel" % "kind-projector" % "0.13.2" - val discipline = "org.typelevel" %% "discipline-scalatest" % "2.2.0" + val `executor-tools` = "com.evolutiongaming" %% "executor-tools" % "1.0.2" + val `config-tools` = "com.evolutiongaming" %% "config-tools" % "1.0.4" + val `future-helper` = "com.evolutiongaming" %% "future-helper" % "1.0.6" + val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "3.5.0" + val `testcontainers-kafka` = "com.dimafeng" %% "testcontainers-scala-kafka" % "0.40.17" + val `play-json-jsoniter` = "com.evolutiongaming" %% "play-json-jsoniter" % "0.10.0" + val `scala-java8-compat` = "org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2" + val `collection-compat` = "org.scala-lang.modules" %% "scala-collection-compat" % "2.8.1" + val scalatest = "org.scalatest" %% "scalatest" % "3.2.13" + val `kind-projector` = "org.typelevel" % "kind-projector" % "0.13.2" + val discipline = "org.typelevel" %% "discipline-scalatest" % "2.2.0" object Kafka { private val version = "3.4.0" - val kafka = "org.apache.kafka" %% "kafka" % version val clients = "org.apache.kafka" % "kafka-clients" % version } diff --git a/tests/src/test/scala/com/evolutiongaming/skafka/ProducerConsumerSpec.scala b/tests/src/test/scala/com/evolutiongaming/skafka/ProducerConsumerSpec.scala index 99bebbe5..3c781c8c 100644 --- a/tests/src/test/scala/com/evolutiongaming/skafka/ProducerConsumerSpec.scala +++ b/tests/src/test/scala/com/evolutiongaming/skafka/ProducerConsumerSpec.scala @@ -5,13 +5,13 @@ import java.time.Instant import java.time.temporal.ChronoUnit import java.util.UUID import cats.arrow.FunctionK -import cats.data.{NonEmptySet => Nes} +import cats.data.{NonEmptyList, NonEmptySet => Nes} import cats.effect.{Deferred, IO, Ref, Resource} import cats.implicits._ import cats.effect.implicits._ +import com.dimafeng.testcontainers.KafkaContainer import com.evolutiongaming.catshelper.CatsHelper._ import com.evolutiongaming.catshelper.Log -import com.evolutiongaming.kafka.StartKafka import com.evolutiongaming.skafka.FiberWithBlockingCancel._ import com.evolutiongaming.skafka.IOSuite._ import com.evolutiongaming.skafka.consumer._ @@ -29,7 +29,11 @@ import scala.concurrent.duration._ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Matchers { import ProducerConsumerSpec._ - private lazy val shutdown = StartKafka() + private val kafkaContainer = KafkaContainer() + + // We need to start the container before the test suite, because the container might not be ready yet when the + // `consumerOf` / `producerOf` method is called, and the class's `init` method is called before the `beforeAll` method + kafkaContainer.start() private val instant = Instant.now().truncatedTo(ChronoUnit.MILLIS) @@ -37,7 +41,6 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match override def beforeAll() = { super.beforeAll() - shutdown () } @@ -58,7 +61,7 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match closeAll.unsafeRunTimed(timeout) - shutdown() + kafkaContainer.stop() super.afterAll() } @@ -76,7 +79,10 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match groupId = Some(s"group-$topic"), autoOffsetReset = AutoOffsetReset.Earliest, autoCommit = false, - common = CommonConfig(clientId = Some(UUID.randomUUID().toString)) + common = CommonConfig( + bootstrapServers = NonEmptyList.one(kafkaContainer.bootstrapServers), + clientId = Some(UUID.randomUUID().toString) + ) ) for { @@ -88,7 +94,14 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match } def producerOf(acks: Acks, idempotence: Boolean): Resource[IO, Producer[IO]] = { - val config = ProducerConfig.Default.copy(acks = acks, idempotence = idempotence) + val config = ProducerConfig + .Default + .copy( + acks = acks, + idempotence = idempotence, + common = CommonConfig(bootstrapServers = NonEmptyList.one(kafkaContainer.bootstrapServers)) + ) + for { metrics <- ProducerMetrics.of(CollectorRegistry.empty[IO]) producerOf = ProducerOf.apply1(metrics("clientId").some).mapK(FunctionK.id, FunctionK.id)