Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use testcontainers-kafka for integration tests #355

Merged
merged 1 commit into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,7 @@ lazy val tests = (project in file("tests")
settings Seq(publish / skip := true, Test / fork := true, Test / parallelExecution := false)
dependsOn skafka % "compile->compile;test->test"
settings (libraryDependencies ++= Seq(
Kafka.kafka % Test,
`kafka-launcher` % Test,
`testcontainers-kafka` % Test,
Slf4j.api % Test,
Slf4j.`log4j-over-slf4j` % Test,
Logback.core % Test,
Expand Down
23 changes: 11 additions & 12 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@ import sbt._

object Dependencies {

val `executor-tools` = "com.evolutiongaming" %% "executor-tools" % "1.0.2"
val `config-tools` = "com.evolutiongaming" %% "config-tools" % "1.0.4"
val `future-helper` = "com.evolutiongaming" %% "future-helper" % "1.0.6"
val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "3.5.0"
val `kafka-launcher` = "com.evolutiongaming" %% "kafka-launcher" % "0.1.0"
val `play-json-jsoniter` = "com.evolutiongaming" %% "play-json-jsoniter" % "0.10.0"
val `scala-java8-compat` = "org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2"
val `collection-compat` = "org.scala-lang.modules" %% "scala-collection-compat" % "2.8.1"
val scalatest = "org.scalatest" %% "scalatest" % "3.2.13"
val `kind-projector` = "org.typelevel" % "kind-projector" % "0.13.2"
val discipline = "org.typelevel" %% "discipline-scalatest" % "2.2.0"
val `executor-tools` = "com.evolutiongaming" %% "executor-tools" % "1.0.2"
val `config-tools` = "com.evolutiongaming" %% "config-tools" % "1.0.4"
val `future-helper` = "com.evolutiongaming" %% "future-helper" % "1.0.6"
val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "3.5.0"
val `testcontainers-kafka` = "com.dimafeng" %% "testcontainers-scala-kafka" % "0.40.17"
val `play-json-jsoniter` = "com.evolutiongaming" %% "play-json-jsoniter" % "0.10.0"
val `scala-java8-compat` = "org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2"
val `collection-compat` = "org.scala-lang.modules" %% "scala-collection-compat" % "2.8.1"
val scalatest = "org.scalatest" %% "scalatest" % "3.2.13"
val `kind-projector` = "org.typelevel" % "kind-projector" % "0.13.2"
val discipline = "org.typelevel" %% "discipline-scalatest" % "2.2.0"

object Kafka {
private val version = "3.4.0"
val kafka = "org.apache.kafka" %% "kafka" % version
val clients = "org.apache.kafka" % "kafka-clients" % version
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.UUID
import cats.arrow.FunctionK
import cats.data.{NonEmptySet => Nes}
import cats.data.{NonEmptyList, NonEmptySet => Nes}
import cats.effect.{Deferred, IO, Ref, Resource}
import cats.implicits._
import cats.effect.implicits._
import com.dimafeng.testcontainers.KafkaContainer
import com.evolutiongaming.catshelper.CatsHelper._
import com.evolutiongaming.catshelper.Log
import com.evolutiongaming.kafka.StartKafka
import com.evolutiongaming.skafka.FiberWithBlockingCancel._
import com.evolutiongaming.skafka.IOSuite._
import com.evolutiongaming.skafka.consumer._
Expand All @@ -29,15 +29,18 @@ import scala.concurrent.duration._
class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Matchers {
import ProducerConsumerSpec._

private lazy val shutdown = StartKafka()
private val kafkaContainer = KafkaContainer()

// We need to start the container before the test suite, because the container might not be ready yet when the
// `consumerOf` / `producerOf` method is called, and the class's `init` method is called before the `beforeAll` method
kafkaContainer.start()
Z1kkurat marked this conversation as resolved.
Show resolved Hide resolved

private val instant = Instant.now().truncatedTo(ChronoUnit.MILLIS)

private val timeout = 1.minute

override def beforeAll() = {
super.beforeAll()
shutdown
()
}

Expand All @@ -58,7 +61,7 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match

closeAll.unsafeRunTimed(timeout)

shutdown()
kafkaContainer.stop()
super.afterAll()
}

Expand All @@ -76,7 +79,10 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match
groupId = Some(s"group-$topic"),
autoOffsetReset = AutoOffsetReset.Earliest,
autoCommit = false,
common = CommonConfig(clientId = Some(UUID.randomUUID().toString))
common = CommonConfig(
bootstrapServers = NonEmptyList.one(kafkaContainer.bootstrapServers),
clientId = Some(UUID.randomUUID().toString)
)
)

for {
Expand All @@ -88,7 +94,14 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match
}

def producerOf(acks: Acks, idempotence: Boolean): Resource[IO, Producer[IO]] = {
val config = ProducerConfig.Default.copy(acks = acks, idempotence = idempotence)
val config = ProducerConfig
.Default
.copy(
acks = acks,
idempotence = idempotence,
common = CommonConfig(bootstrapServers = NonEmptyList.one(kafkaContainer.bootstrapServers))
)

for {
metrics <- ProducerMetrics.of(CollectorRegistry.empty[IO])
producerOf = ProducerOf.apply1(metrics("clientId").some).mapK(FunctionK.id, FunctionK.id)
Expand Down