From 81f1b26ff61a509e415c1be5767380ce1f3df1ac Mon Sep 17 00:00:00 2001 From: Z1kkurat Date: Tue, 4 Jul 2023 10:45:39 +0200 Subject: [PATCH] Use testcontainers-kafka for integration tests --- build.sbt | 3 +- project/Dependencies.scala | 22 +- .../skafka/ProducerConsumerSpec.scala | 281 ++++++++++-------- 3 files changed, 174 insertions(+), 132 deletions(-) diff --git a/build.sbt b/build.sbt index e3ff86bb..f6f7fe7f 100644 --- a/build.sbt +++ b/build.sbt @@ -83,8 +83,7 @@ lazy val tests = (project in file("tests") Test / parallelExecution := false) dependsOn skafka % "compile->compile;test->test" settings (libraryDependencies ++= Seq( - Kafka.kafka % Test, - `kafka-launcher` % Test, + `testcontainers-kafka` % Test, Slf4j.api % Test, Slf4j.`log4j-over-slf4j` % Test, Logback.core % Test, diff --git a/project/Dependencies.scala b/project/Dependencies.scala index a9b99711..7085fa16 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -2,17 +2,17 @@ import sbt._ object Dependencies { - val `executor-tools` = "com.evolutiongaming" %% "executor-tools" % "1.0.2" - val `config-tools` = "com.evolutiongaming" %% "config-tools" % "1.0.4" - val `future-helper` = "com.evolutiongaming" %% "future-helper" % "1.0.6" - val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "2.11.0" - val `kafka-launcher` = "com.evolutiongaming" %% "kafka-launcher" % "0.1.0" - val `play-json-jsoniter` = "com.evolutiongaming" %% "play-json-jsoniter" % "0.10.0" - val `scala-java8-compat` = "org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2" - val `collection-compat` = "org.scala-lang.modules" %% "scala-collection-compat" % "2.8.1" - val scalatest = "org.scalatest" %% "scalatest" % "3.2.13" - val `kind-projector` = "org.typelevel" % "kind-projector" % "0.13.2" - val discipline = "org.typelevel" %% "discipline-scalatest" % "2.2.0" + val `executor-tools` = "com.evolutiongaming" %% "executor-tools" % "1.0.2" + val `config-tools` = "com.evolutiongaming" %% "config-tools" % "1.0.4" + val `future-helper` = "com.evolutiongaming" %% "future-helper" % "1.0.6" + val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "2.11.0" + val `testcontainers-kafka` = "com.dimafeng" %% "testcontainers-scala-kafka" % "0.40.17" + val `play-json-jsoniter` = "com.evolutiongaming" %% "play-json-jsoniter" % "0.10.0" + val `scala-java8-compat` = "org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2" + val `collection-compat` = "org.scala-lang.modules" %% "scala-collection-compat" % "2.8.1" + val scalatest = "org.scalatest" %% "scalatest" % "3.2.13" + val `kind-projector` = "org.typelevel" % "kind-projector" % "0.13.2" + val discipline = "org.typelevel" %% "discipline-scalatest" % "2.2.0" object Kafka { private val version = "3.4.0" diff --git a/tests/src/test/scala/com/evolutiongaming/skafka/ProducerConsumerSpec.scala b/tests/src/test/scala/com/evolutiongaming/skafka/ProducerConsumerSpec.scala index 3f53951c..21ebceb5 100644 --- a/tests/src/test/scala/com/evolutiongaming/skafka/ProducerConsumerSpec.scala +++ b/tests/src/test/scala/com/evolutiongaming/skafka/ProducerConsumerSpec.scala @@ -5,13 +5,13 @@ import java.time.Instant import java.time.temporal.ChronoUnit import java.util.UUID import cats.arrow.FunctionK -import cats.data.{NonEmptySet => Nes} +import cats.data.{NonEmptyList, NonEmptySet => Nes} import cats.effect.concurrent.{Deferred, Ref} import cats.effect.{IO, Resource} import cats.implicits._ +import com.dimafeng.testcontainers.KafkaContainer import com.evolutiongaming.catshelper.CatsHelper._ import com.evolutiongaming.catshelper.Log -import com.evolutiongaming.kafka.StartKafka import com.evolutiongaming.skafka.FiberWithBlockingCancel._ import com.evolutiongaming.skafka.IOSuite._ import com.evolutiongaming.skafka.consumer._ @@ -29,7 +29,11 @@ import scala.concurrent.duration._ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Matchers { import ProducerConsumerSpec._ - private lazy val shutdown = StartKafka() + private val kafkaContainer = KafkaContainer() + + // We need to start the container before the test suite, because the container might not be ready yet when the + // `consumerOf` / `producerOf` method is called, and the class's `init` method is called before the `beforeAll` method + kafkaContainer.start() private val instant = Instant.now().truncatedTo(ChronoUnit.MILLIS) @@ -37,7 +41,6 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match override def beforeAll() = { super.beforeAll() - shutdown () } @@ -48,20 +51,20 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match producer <- producers } yield producer - val closeAll = producers.distinct.foldMapM { case (producer, release) => - for { - _ <- producer.flush - _ <- release - } yield {} + val closeAll = producers.distinct.foldMapM { + case (producer, release) => + for { + _ <- producer.flush + _ <- release + } yield {} } closeAll.unsafeRunTimed(timeout) - shutdown() + kafkaContainer.stop() super.afterAll() } - val headers = List(Header(key = "key", value = "value".getBytes(UTF_8))) @nowarn("cat=deprecation") @@ -70,26 +73,38 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match listener: Option[RebalanceListener[IO]] ): Resource[IO, Consumer[IO, String, String]] = { - val config = ConsumerConfig.Default.copy( - groupId = Some(s"group-$topic"), - autoOffsetReset = AutoOffsetReset.Earliest, - autoCommit = false, - common = CommonConfig(clientId = Some(UUID.randomUUID().toString))) + val config = ConsumerConfig + .Default + .copy( + groupId = Some(s"group-$topic"), + autoOffsetReset = AutoOffsetReset.Earliest, + autoCommit = false, + common = CommonConfig( + bootstrapServers = NonEmptyList.one(kafkaContainer.bootstrapServers), + clientId = Some(UUID.randomUUID().toString) + ) + ) for { - metrics <- ConsumerMetrics.of(CollectorRegistry.empty[IO]) - consumerOf = ConsumerOf[IO](executor, metrics("clientId").some).mapK(FunctionK.id, FunctionK.id) - consumer <- consumerOf[String, String](config) - _ <- consumer.subscribe(Nes.of(topic), listener).toResource + metrics <- ConsumerMetrics.of(CollectorRegistry.empty[IO]) + consumerOf = ConsumerOf[IO](executor, metrics("clientId").some).mapK(FunctionK.id, FunctionK.id) + consumer <- consumerOf[String, String](config) + _ <- consumer.subscribe(Nes.of(topic), listener).toResource } yield consumer } def producerOf(acks: Acks, idempotence: Boolean): Resource[IO, Producer[IO]] = { - val config = ProducerConfig.Default.copy(acks = acks, idempotence = idempotence) + val config = ProducerConfig + .Default + .copy( + acks = acks, + idempotence = idempotence, + common = CommonConfig(bootstrapServers = NonEmptyList.one(kafkaContainer.bootstrapServers)) + ) for { - metrics <- ProducerMetrics.of(CollectorRegistry.empty[IO]) - producerOf = ProducerOf.apply1(executor, metrics("clientId").some).mapK(FunctionK.id, FunctionK.id) - producer <- producerOf(config) + metrics <- ProducerMetrics.of(CollectorRegistry.empty[IO]) + producerOf = ProducerOf.apply1(executor, metrics("clientId").some).mapK(FunctionK.id, FunctionK.id) + producer <- producerOf(config) } yield { producer.withLogging(Log.empty) } @@ -113,7 +128,7 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match assigned <- Deferred[IO, Unit] listener = listenerOf(assigned = assigned) consumer = consumerOf(topic, listener.some) - _ <- consumer.use { consumer => + _ <- consumer.use { consumer => val poll = consumer .poll(10.millis) .foreverM[Unit] @@ -156,7 +171,7 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match // in `onPartitionsAssigned` - aggregate consumer.position responses in Set[Offset] // if the Set has only one element, then it works correctly - val topic = s"${instant.toEpochMilli}-rebalance-listener-correctness-position" + val topic = s"${instant.toEpochMilli}-rebalance-listener-correctness-position" val requiredNumberOfRebalances = 100 def listenerOf( @@ -173,7 +188,7 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match IO.delay(()) *> IO.shift *> IO.delay(()) - ).lift + ).lift // with IO.shift attempts we make sure that // kafka java consumer.position method would be called from the same thread as consumer.poll // otherwise kafka java consumer throws @@ -181,9 +196,9 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match // which in result would terminate current flatMap chain // and fail the test as positions (Set[Offset]) would be empty position <- consumer.position(partitions.head) - _ <- positions.update(_.+(position)).lift - _ <- rebalanceCounter.update(_ + 1).lift - _ <- assigned.complete(()).lift + _ <- positions.update(_.+(position)).lift + _ <- rebalanceCounter.update(_ + 1).lift + _ <- assigned.complete(()).lift } yield () } @@ -194,13 +209,16 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match } val result = for { - testCompleted <- Deferred[IO, Unit] - positions <- Ref.of[IO, Set[Offset]](Set.empty) + testCompleted <- Deferred[IO, Unit] + positions <- Ref.of[IO, Set[Offset]](Set.empty) rebalanceCounter <- Ref.of[IO, Int](0) completeTestIfNeeded = for { rebalanceCounter <- rebalanceCounter.get - positions <- positions.get - completed <- if (rebalanceCounter >= requiredNumberOfRebalances || positions.size > 1) testCompleted.complete(()).handleError(_ => ()) else IO.unit + positions <- positions.get + completed <- + if (rebalanceCounter >= requiredNumberOfRebalances || positions.size > 1) + testCompleted.complete(()).handleError(_ => ()) + else IO.unit } yield completed consumer = consumerOf(topic, none) @@ -215,12 +233,12 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match testStep = Deferred[IO, Unit].flatMap { assigned => val x = for { - _ <- Resource.release(completeTestIfNeeded) + _ <- Resource.release(completeTestIfNeeded) consumer <- consumer - listener = listenerOf(positions, rebalanceCounter, assigned) - _ <- consumer.subscribe(Nes.of(topic), listener).toResource + listener = listenerOf(positions, rebalanceCounter, assigned) + _ <- consumer.subscribe(Nes.of(topic), listener).toResource poll = { - consumer.poll(10.millis).onError({ case e => IO.delay(println(s"${System.nanoTime()} poll failed $e")) }) + consumer.poll(10.millis).onError({ case e => IO.delay(println(s"${System.nanoTime()} poll failed $e")) }) } // without IO.cancelBoundary consumer.poll is called after consumer was closed // https://github.com/typelevel/cats-effect/issues/326#issuecomment-416925044 @@ -230,11 +248,20 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match } _ <- Resource - .make(testStep.foreverM.start) {_.cancel} + .make(testStep.foreverM.start) { _.cancel } .use { _ => - testCompleted.get.timeout(33.seconds) - .onError({ case _ => rebalanceCounter.get.map(c => - println(s"rebalance listener correctness (position): onPartitionsAssigned $c out of $requiredNumberOfRebalances times")) + testCompleted + .get + .timeout(33.seconds) + .onError({ + case _ => + rebalanceCounter + .get + .map(c => + println( + s"rebalance listener correctness (position): onPartitionsAssigned $c out of $requiredNumberOfRebalances times" + ) + ) }) } positions <- positions.get @@ -257,14 +284,14 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match // if the List == [0..N), then it works correctly, // as every commit was successfully executed - val topic = s"${instant.toEpochMilli}-rebalance-listener-correctness-commit" + val topic = s"${instant.toEpochMilli}-rebalance-listener-correctness-commit" val requiredNumberOfRebalances = 100 def listenerOf( - offsets: Ref[IO, List[Offset]], - rebalanceCounter: Ref[IO, Int], - assigned: Deferred[IO, Unit] - ): RebalanceListener1[IO] = { + offsets: Ref[IO, List[Offset]], + rebalanceCounter: Ref[IO, Int], + assigned: Deferred[IO, Unit] + ): RebalanceListener1[IO] = { new RebalanceListener1WithConsumer[IO] { def onPartitionsAssigned(partitions: Nes[TopicPartition]) = { @@ -277,9 +304,11 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match def onPartitionsRevoked(partitions: Nes[TopicPartition]) = for { committed <- consumer.committed(partitions) - offset = committed.headOption.map(_._2.offset).getOrElse(Offset.min) - _ <- offsets.update(_ :+ offset).lift - a <- consumer.commit(partitions.map(_ -> OffsetAndMetadata(Offset.unsafe(offset.value + 1))).toNonEmptyList.toNem) + offset = committed.headOption.map(_._2.offset).getOrElse(Offset.min) + _ <- offsets.update(_ :+ offset).lift + a <- consumer.commit( + partitions.map(_ -> OffsetAndMetadata(Offset.unsafe(offset.value + 1))).toNonEmptyList.toNem + ) } yield a def onPartitionsLost(partitions: Nes[TopicPartition]) = RebalanceCallback.empty @@ -287,12 +316,12 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match } val result = for { - testCompleted <- Deferred[IO, Unit] - offsets <- Ref.of[IO, List[Offset]](List.empty) + testCompleted <- Deferred[IO, Unit] + offsets <- Ref.of[IO, List[Offset]](List.empty) rebalanceCounter <- Ref.of[IO, Int](0) completeTestIfNeeded = for { rebalanceCounter <- rebalanceCounter.get - completed <- if (rebalanceCounter >= requiredNumberOfRebalances) testCompleted.complete(()) else IO.unit + completed <- if (rebalanceCounter >= requiredNumberOfRebalances) testCompleted.complete(()) else IO.unit } yield completed consumer = consumerOf(topic, none) @@ -305,8 +334,8 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match testStep = Deferred[IO, Unit].flatMap { assigned => consumer.use { consumer => - val listener = listenerOf(offsets, rebalanceCounter, assigned) - val poll = consumer.poll(10.millis) + val listener = listenerOf(offsets, rebalanceCounter, assigned) + val poll = consumer.poll(10.millis) val subscribe = consumer.subscribe(Nes.of(topic), listener) subscribe *> Resource @@ -321,9 +350,18 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match _.cancel } .use { _ => - testCompleted.get.timeout(33.seconds) - .onError({ case _ => rebalanceCounter.get.map(c => - println(s"rebalance listener correctness (commit): onPartitionsAssigned $c out of $requiredNumberOfRebalances times")) + testCompleted + .get + .timeout(33.seconds) + .onError({ + case _ => + rebalanceCounter + .get + .map(c => + println( + s"rebalance listener correctness (commit): onPartitionsAssigned $c out of $requiredNumberOfRebalances times" + ) + ) }) } offsets <- offsets.get @@ -359,17 +397,19 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match val consumer = consumerOf(topic, none) val producer = producerOf(Acks.One, idempotence = false) - producer.use { producer => - for { - _ <- producer.send(ProducerRecord(topic, "value1", "key")).flatten - _ <- producer.send(ProducerRecord(topic, "value2", "key")).flatten - } yield () - }.unsafeRunSync() + producer + .use { producer => + for { + _ <- producer.send(ProducerRecord(topic, "value1", "key")).flatten + _ <- producer.send(ProducerRecord(topic, "value2", "key")).flatten + } yield () + } + .unsafeRunSync() val consumeRecords: IO[List[(String, String)]] = consumer.use { consumer => for { - _ <- consumer.subscribe(Nes.of(topic), listener) - poll = consumer.poll(10.millis) + _ <- consumer.subscribe(Nes.of(topic), listener) + poll = consumer.poll(10.millis) records <- poll.iterateUntil(_.values.nonEmpty) } yield records.values.values.flatMap(_.toList.map(r => (r.key.get.value, r.value.get.value))).toList } @@ -392,7 +432,7 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match } yield { val topic = s"${instant.toEpochMilli}-$idx-$acks" - val name = s"[topic:$topic,acks:$acks]" + val name = s"[topic:$topic,acks:$acks]" def produce(record: ProducerRecord[String, String]) = producer.send(record).flatten.unsafeRunSync() @@ -402,50 +442,51 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match .unsafeRunSync() test(s"$name produce and consume record") { - val key = "key1" - val value = "value1" + val key = "key1" + val value = "value1" val timestamp = instant val record = ProducerRecord( - topic = topic, - value = Some(value), - key = Some(key), + topic = topic, + value = Some(value), + key = Some(key), timestamp = Some(timestamp), - headers = headers) + headers = headers + ) val metadata = produce(record) - val offset = if (acks == Acks.None) none[Offset] else Offset.min.some + val offset = if (acks == Acks.None) none[Offset] else Offset.min.some metadata.offset shouldEqual offset val records = consumer.consume(timeout).map(Record(_)) val expected = Record( record = ConsumerRecord( - topicPartition = metadata.topicPartition, - offset = Offset.min, + topicPartition = metadata.topicPartition, + offset = Offset.min, timestampAndType = Some(TimestampAndType(timestamp, TimestampType.create)), - key = Some(WithSize(key, 4)), - value = Some(WithSize(value, 6)), - headers = Nil), - headers = List(Record.Header(key = "key", value = "value"))) + key = Some(WithSize(key, 4)), + value = Some(WithSize(value, 6)), + headers = Nil + ), + headers = List(Record.Header(key = "key", value = "value")) + ) records shouldEqual List(expected) } test(s"$name produce and delete record") { - val key = "key2" - val record = ProducerRecord(topic, value = "value2", key = key) + val key = "key2" + val record = ProducerRecord(topic, value = "value2", key = key) val metadata = produce(record) - val offset = if (acks == Acks.None) none[Offset] else Offset.unsafe(1L).some + val offset = if (acks == Acks.None) none[Offset] else Offset.unsafe(1L).some metadata.offset shouldEqual offset - val keyAndValues = consumer.consume(timeout).map { record => (record.key.map(_.value), record.value.map(_.value)) } + val keyAndValues = + consumer.consume(timeout).map { record => (record.key.map(_.value), record.value.map(_.value)) } keyAndValues shouldEqual List((Some(key), record.value)) val timestamp = instant - val delete = ProducerRecord[String, String]( - topic = topic, - key = Some(key), - timestamp = Some(timestamp), - headers = headers) + val delete = + ProducerRecord[String, String](topic = topic, key = Some(key), timestamp = Some(timestamp), headers = headers) val deleteMetadata = produce(delete) @@ -453,22 +494,21 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match val expected = Record( record = ConsumerRecord[String, String]( - topicPartition = deleteMetadata.topicPartition, - offset = Offset.unsafe(2), + topicPartition = deleteMetadata.topicPartition, + offset = Offset.unsafe(2), timestampAndType = Some(TimestampAndType(timestamp, TimestampType.create)), - key = Some(WithSize(key, 4)), - headers = Nil), - headers = List(Record.Header(key = "key", value = "value"))) + key = Some(WithSize(key, 4)), + headers = Nil + ), + headers = List(Record.Header(key = "key", value = "value")) + ) records shouldEqual List(expected) } test(s"$name produce and consume empty record") { val timestamp = instant - val empty = ProducerRecord[String, String]( - topic = topic, - timestamp = Some(timestamp), - headers = headers) + val empty = ProducerRecord[String, String](topic = topic, timestamp = Some(timestamp), headers = headers) val metadata = produce(empty) @@ -476,29 +516,32 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match val expected = Record( record = ConsumerRecord[String, String]( - topicPartition = metadata.topicPartition, - offset = Offset.unsafe(3), + topicPartition = metadata.topicPartition, + offset = Offset.unsafe(3), timestampAndType = Some(TimestampAndType(timestamp, TimestampType.create)), - headers = Nil), - headers = List(Record.Header(key = "key", value = "value"))) + headers = Nil + ), + headers = List(Record.Header(key = "key", value = "value")) + ) records shouldEqual List(expected) } test(s"$name commit and subscribe from last committed position") { - val key = "key3" - val value = "value3" + val key = "key3" + val value = "value3" val timestamp = instant Await.result(consumer.commit(), timeout) consumerRelease.unsafeRunSync() val record = ProducerRecord( - topic = topic, - value = Some(value), - key = Some(key), + topic = topic, + value = Some(value), + key = Some(key), timestamp = Some(timestamp), - headers = headers) + headers = headers + ) val metadata = produce(record) @@ -508,13 +551,15 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match val expected = Record( record = ConsumerRecord( - topicPartition = metadata.topicPartition, - offset = Offset.unsafe(4L), + topicPartition = metadata.topicPartition, + offset = Offset.unsafe(4L), timestampAndType = Some(TimestampAndType(timestamp, TimestampType.create)), - key = WithSize(key, 4).some, - value = WithSize(value, 6).some, - headers = Nil), - headers = List(Record.Header(key = "key", value = "value"))) + key = WithSize(key, 4).some, + value = WithSize(value, 6).some, + headers = Nil + ), + headers = List(Record.Header(key = "key", value = "value")) + ) records shouldEqual List(expected) @@ -534,9 +579,7 @@ object ProducerConsumerSpec { val headers = record.headers.map { header => Header(key = header.key, value = new String(header.value, UTF_8)) } - Record( - record = record.copy(headers = Nil), - headers = headers) + Record(record = record.copy(headers = Nil), headers = headers) } final case class Header(key: String, value: String) @@ -549,7 +592,7 @@ object ProducerConsumerSpec { def consume(attempts: Int): List[ConsumerRecord[String, String]] = { if (attempts <= 0) Nil else { - val future = self.poll(100.millis).unsafeToFuture() + val future = self.poll(100.millis).unsafeToFuture() val records = Await.result(future, timeout).values.values.flatMap(_.toList) if (records.isEmpty) consume(attempts - 1) else records.toList @@ -562,4 +605,4 @@ object ProducerConsumerSpec { def commit() = self.commit.unsafeToFuture() } -} \ No newline at end of file +}