Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unified pre-load and post-load source and new API #31

Merged
merged 4 commits into from
May 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# SBT
target/
.bloop/

# IDE
.idea
.idea/
*.iml
.metals/

# Scala
*.class
66 changes: 55 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# kafka-topic-loader
Reads the contents of provided Kafka topics, either the topics in their entirety or up until a consumer groups last committed Offset depending on which `LoadTopicStrategy` you provide.

As of version `1.3.0`, data can be loaded either from complete topics or selected partitions using `TopicLoader.fromTopics` and `TopicLoader.fromPartitions` respectively. By loading from specific partitions the topic loader can be used by multiple application instances with separate streams per set of partitions (see [Alpakka kafka](https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html#source-per-partition) and below).
As of version `1.3.0`, data can be loaded either from complete topics using `load` or `loadAndRun`.

Add the following to your `build.sbt`:
```scala
Expand All @@ -12,21 +12,59 @@ resolvers += "bintray-sky-uk-oss-maven" at "https://dl.bintray.com/sky-uk/oss-ma

```scala
import com.sky.kafka.topicloader.{LoadAll, TopicLoader}
import org.apache.kafka.common.serialization.Deserializer}

val storeRecords: ConsumerRecord[String, SourceEntity] => Future[BusinessEntity] = {
/* store records in akka.Actor for example */
}
implicit val as: ActorSystem = ActorSystem()
implicit val stringDeserializer: Deserializer[String] = new StringDeserializer

val stream =
TopicLoader.fromTopics(LoadAll, NonEmptyList.one("topic-to-load"), storeRecords, new LongDeserializer)
val stream = TopicLoader.load[String, String](NonEmptyList.one("topic-to-load"), LoadAll)
.mapAsync(1)(_ => ??? /* store records in akka.Actor for example */)
.runWith(Sink.ignore)
```

`loadAndRun` will load the topics, complete the `Future[Done]` from the materialised value and then carry on
running, emitting any new records that appear on the topics. An example use-case for this is a REST API that holds the
contents of a Kafka topic in memory. This kind of application doesn't need to commit offsets and can use the `Future[Done]` to determine readiness.

```scala
object Main extends App {

implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()

import system.dispatcher

implicit val keyDeserializer: Deserializer[String] = new StringDeserializer
implicit val valueDeserializer: Deserializer[Array[Byte]] = new ByteArrayDeserializer

val state = new SimplifiedState

val (initialLoadingFuture, controlF): (Future[Done], Future[Consumer.Control]) =
TopicLoader
.loadAndRun[String, Array[Byte]](NonEmptyList.one("topic-to-load"))
.to(Sink.foreach(record => state.store.put(record.key, record.value)))
.run()

initialLoadingFuture.foreach(_ => state.isAppReady.set(true))
}

class SimplifiedState {

/**
* API requests may query this state
*/
val store = new ConcurrentHashMap[String, Array[Byte]]()

/**
* A readiness endpoint could be created that queries this
*/
val isAppReady = new AtomicBoolean()
}
```

## Configuring your consumer group.id

You should configure the `akka.kafka.consumer.kafka-clients.group.id` to match that of your application.
This is especially important for the `LoadCommitted` version of `LoadTopicStrategy` to correctly
read up to the correct offset.

e.g
```
Expand All @@ -42,8 +80,14 @@ akka.kafka {
```

## Source per partition

This is deprecated in favour of a new API for partitioned loading which is coming soon.

Data can also be loaded from specific partitions using `fromPartitions`. By loading from specific partitions the topic
loader can be used by multiple application instances with separate streams per set of partitions (see [Alpakka kafka](https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html#source-per-partition) and below).

```scala
implicit val as: ActorSystem = ActorSystem()
implicit val system = ActorSystem()

val consumerSettings: ConsumerSettings[String, Long] = ???
val doBusinessLogic: ConsumerRecord[String, Long] => Future[Unit] = ???
Expand All @@ -54,7 +98,7 @@ val stream: Source[ConsumerMessage.CommittableMessage[String, Long], Consumer.Co
.flatMapConcat {
case (topicPartition, source) =>
TopicLoader
.fromPartitions(LoadAll, NonEmptyList.one(topicPartition), doBusinessLogic, new LongDeserializer)
.fromPartitions(LoadAll, NonEmptyList.one(topicPartition), doBusinessLogic, new LongDeserializer())
.flatMapConcat(_ => source)
}
}
```
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ val RefinedVersion = "0.9.3"
// @formatter:off
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream" % AkkaVersion,
"com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion,
"com.typesafe.akka" %% "akka-stream-kafka" % "0.22",
"com.typesafe.scala-logging" %% "scala-logging" % "3.7.2",
"org.typelevel" %% "cats-core" % CatsVersion,
Expand All @@ -50,5 +51,7 @@ libraryDependencies ++= Seq(
)
// @formatter:on

resolvers += "segence" at "https://dl.bintray.com/segence/maven-oss-releases/"

addCommandAlias("checkFmt", ";scalafmt::test; test:scalafmt::test; sbt:scalafmt::test")
addCommandAlias("runFmt", ";scalafmt; test:scalafmt; sbt:scalafmt")
Loading