Skip to content

Commit

Permalink
Merge pull request #31 from sky-uk/30-new-api
Browse files Browse the repository at this point in the history
Unified pre-load and post-load source and new API
  • Loading branch information
lacarvalho91 authored May 21, 2019
2 parents 6f36547 + 5866a44 commit 573ec73
Show file tree
Hide file tree
Showing 10 changed files with 733 additions and 608 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# SBT
target/
.bloop/

# IDE
.idea
.idea/
*.iml
.metals/

# Scala
*.class
66 changes: 55 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# kafka-topic-loader
Reads the contents of provided Kafka topics, either the topics in their entirety or up until a consumer groups last committed Offset depending on which `LoadTopicStrategy` you provide.

As of version `1.3.0`, data can be loaded either from complete topics or selected partitions using `TopicLoader.fromTopics` and `TopicLoader.fromPartitions` respectively. By loading from specific partitions the topic loader can be used by multiple application instances with separate streams per set of partitions (see [Alpakka kafka](https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html#source-per-partition) and below).
As of version `1.3.0`, data can be loaded either from complete topics using `load` or `loadAndRun`.

Add the following to your `build.sbt`:
```scala
Expand All @@ -12,21 +12,59 @@ resolvers += "bintray-sky-uk-oss-maven" at "https://dl.bintray.com/sky-uk/oss-ma

```scala
import com.sky.kafka.topicloader.{LoadAll, TopicLoader}
import org.apache.kafka.common.serialization.Deserializer}

val storeRecords: ConsumerRecord[String, SourceEntity] => Future[BusinessEntity] = {
/* store records in akka.Actor for example */
}
implicit val as: ActorSystem = ActorSystem()
implicit val stringDeserializer: Deserializer[String] = new StringDeserializer

val stream =
TopicLoader.fromTopics(LoadAll, NonEmptyList.one("topic-to-load"), storeRecords, new LongDeserializer)
val stream = TopicLoader.load[String, String](NonEmptyList.one("topic-to-load"), LoadAll)
.mapAsync(1)(_ => ??? /* store records in akka.Actor for example */)
.runWith(Sink.ignore)
```

`loadAndRun` will load the topics, complete the `Future[Done]` from the materialised value and then carry on
running, emitting any new records that appear on the topics. An example use-case for this is a REST API that holds the
contents of a Kafka topic in memory. This kind of application doesn't need to commit offsets and can use the `Future[Done]` to determine readiness.

```scala
object Main extends App {

implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()

import system.dispatcher

implicit val keyDeserializer: Deserializer[String] = new StringDeserializer
implicit val valueDeserializer: Deserializer[Array[Byte]] = new ByteArrayDeserializer

val state = new SimplifiedState

val (initialLoadingFuture, controlF): (Future[Done], Future[Consumer.Control]) =
TopicLoader
.loadAndRun[String, Array[Byte]](NonEmptyList.one("topic-to-load"))
.to(Sink.foreach(record => state.store.put(record.key, record.value)))
.run()

initialLoadingFuture.foreach(_ => state.isAppReady.set(true))
}

class SimplifiedState {

/**
* API requests may query this state
*/
val store = new ConcurrentHashMap[String, Array[Byte]]()

/**
* A readiness endpoint could be created that queries this
*/
val isAppReady = new AtomicBoolean()
}
```

## Configuring your consumer group.id

You should configure the `akka.kafka.consumer.kafka-clients.group.id` to match that of your application.
This is especially important for the `LoadCommitted` version of `LoadTopicStrategy` to correctly
read up to the correct offset.

e.g
```
Expand All @@ -42,8 +80,14 @@ akka.kafka {
```

## Source per partition

This is deprecated in favour of a new API for partitioned loading which is coming soon.

Data can also be loaded from specific partitions using `fromPartitions`. By loading from specific partitions the topic
loader can be used by multiple application instances with separate streams per set of partitions (see [Alpakka kafka](https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html#source-per-partition) and below).

```scala
implicit val as: ActorSystem = ActorSystem()
implicit val system = ActorSystem()

val consumerSettings: ConsumerSettings[String, Long] = ???
val doBusinessLogic: ConsumerRecord[String, Long] => Future[Unit] = ???
Expand All @@ -54,7 +98,7 @@ val stream: Source[ConsumerMessage.CommittableMessage[String, Long], Consumer.Co
.flatMapConcat {
case (topicPartition, source) =>
TopicLoader
.fromPartitions(LoadAll, NonEmptyList.one(topicPartition), doBusinessLogic, new LongDeserializer)
.fromPartitions(LoadAll, NonEmptyList.one(topicPartition), doBusinessLogic, new LongDeserializer())
.flatMapConcat(_ => source)
}
}
```
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ val RefinedVersion = "0.9.3"
// @formatter:off
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream" % AkkaVersion,
"com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion,
"com.typesafe.akka" %% "akka-stream-kafka" % "0.22",
"com.typesafe.scala-logging" %% "scala-logging" % "3.7.2",
"org.typelevel" %% "cats-core" % CatsVersion,
Expand All @@ -50,5 +51,7 @@ libraryDependencies ++= Seq(
)
// @formatter:on

resolvers += "segence" at "https://dl.bintray.com/segence/maven-oss-releases/"

addCommandAlias("checkFmt", ";scalafmt::test; test:scalafmt::test; sbt:scalafmt::test")
addCommandAlias("runFmt", ";scalafmt; test:scalafmt; sbt:scalafmt")
Loading

0 comments on commit 573ec73

Please sign in to comment.