Skip to content

Commit

Permalink
Use testcontainers-kafka for integration tests (#355)
Browse files Browse the repository at this point in the history
  • Loading branch information
Z1kkurat committed Jul 13, 2023
1 parent f85178c commit 20b51f7
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 21 deletions.
3 changes: 1 addition & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ lazy val tests = (project in file("tests")
settings Seq(publish / skip := true, Test / fork := true, Test / parallelExecution := false)
dependsOn skafka % "compile->compile;test->test"
settings (libraryDependencies ++= Seq(
Kafka.kafka % Test,
`kafka-launcher` % Test,
`testcontainers-kafka` % Test,
Slf4j.api % Test,
Slf4j.`log4j-over-slf4j` % Test,
Logback.core % Test,
Expand Down
23 changes: 11 additions & 12 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@ import sbt._

object Dependencies {

val `executor-tools` = "com.evolutiongaming" %% "executor-tools" % "1.0.2"
val `config-tools` = "com.evolutiongaming" %% "config-tools" % "1.0.4"
val `future-helper` = "com.evolutiongaming" %% "future-helper" % "1.0.6"
val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "3.5.0"
val `kafka-launcher` = "com.evolutiongaming" %% "kafka-launcher" % "0.1.0"
val `play-json-jsoniter` = "com.evolutiongaming" %% "play-json-jsoniter" % "0.10.0"
val `scala-java8-compat` = "org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2"
val `collection-compat` = "org.scala-lang.modules" %% "scala-collection-compat" % "2.8.1"
val scalatest = "org.scalatest" %% "scalatest" % "3.2.13"
val `kind-projector` = "org.typelevel" % "kind-projector" % "0.13.2"
val discipline = "org.typelevel" %% "discipline-scalatest" % "2.2.0"
val `executor-tools` = "com.evolutiongaming" %% "executor-tools" % "1.0.2"
val `config-tools` = "com.evolutiongaming" %% "config-tools" % "1.0.4"
val `future-helper` = "com.evolutiongaming" %% "future-helper" % "1.0.6"
val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "3.5.0"
val `testcontainers-kafka` = "com.dimafeng" %% "testcontainers-scala-kafka" % "0.40.17"
val `play-json-jsoniter` = "com.evolutiongaming" %% "play-json-jsoniter" % "0.10.0"
val `scala-java8-compat` = "org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2"
val `collection-compat` = "org.scala-lang.modules" %% "scala-collection-compat" % "2.8.1"
val scalatest = "org.scalatest" %% "scalatest" % "3.2.13"
val `kind-projector` = "org.typelevel" % "kind-projector" % "0.13.2"
val discipline = "org.typelevel" %% "discipline-scalatest" % "2.2.0"

object Kafka {
private val version = "3.4.0"
val kafka = "org.apache.kafka" %% "kafka" % version
val clients = "org.apache.kafka" % "kafka-clients" % version
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.UUID
import cats.arrow.FunctionK
import cats.data.{NonEmptySet => Nes}
import cats.data.{NonEmptyList, NonEmptySet => Nes}
import cats.effect.{Deferred, IO, Ref, Resource}
import cats.implicits._
import cats.effect.implicits._
import com.dimafeng.testcontainers.KafkaContainer
import com.evolutiongaming.catshelper.CatsHelper._
import com.evolutiongaming.catshelper.Log
import com.evolutiongaming.kafka.StartKafka
import com.evolutiongaming.skafka.FiberWithBlockingCancel._
import com.evolutiongaming.skafka.IOSuite._
import com.evolutiongaming.skafka.consumer._
Expand All @@ -29,15 +29,18 @@ import scala.concurrent.duration._
class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Matchers {
import ProducerConsumerSpec._

private lazy val shutdown = StartKafka()
private val kafkaContainer = KafkaContainer()

// We need to start the container before the test suite, because the container might not be ready yet when the
// `consumerOf` / `producerOf` method is called, and the class's `init` method is called before the `beforeAll` method
kafkaContainer.start()

private val instant = Instant.now().truncatedTo(ChronoUnit.MILLIS)

private val timeout = 1.minute

override def beforeAll() = {
super.beforeAll()
shutdown
()
}

Expand All @@ -58,7 +61,7 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match

closeAll.unsafeRunTimed(timeout)

shutdown()
kafkaContainer.stop()
super.afterAll()
}

Expand All @@ -76,7 +79,10 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match
groupId = Some(s"group-$topic"),
autoOffsetReset = AutoOffsetReset.Earliest,
autoCommit = false,
common = CommonConfig(clientId = Some(UUID.randomUUID().toString))
common = CommonConfig(
bootstrapServers = NonEmptyList.one(kafkaContainer.bootstrapServers),
clientId = Some(UUID.randomUUID().toString)
)
)

for {
Expand All @@ -88,7 +94,14 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match
}

def producerOf(acks: Acks, idempotence: Boolean): Resource[IO, Producer[IO]] = {
val config = ProducerConfig.Default.copy(acks = acks, idempotence = idempotence)
val config = ProducerConfig
.Default
.copy(
acks = acks,
idempotence = idempotence,
common = CommonConfig(bootstrapServers = NonEmptyList.one(kafkaContainer.bootstrapServers))
)

for {
metrics <- ProducerMetrics.of(CollectorRegistry.empty[IO])
producerOf = ProducerOf.apply1(metrics("clientId").some).mapK(FunctionK.id, FunctionK.id)
Expand Down

0 comments on commit 20b51f7

Please sign in to comment.