Building an end-to-end data streaming pipeline, with a dummy data generator, a kafka consumer and processor modules, using well established open-source libraries:
This project applies Diamond Architecture design patterns and follows a Kappa Architecture data processing strategy.
For the motivation on using the Diamond Architecture or the Kappa Architecture, click on the following links:
- JDK - At least Java 9.0. Recommended > 11.0
- SBT - At least 1.9.0
- Docker Compose
- Kafkacat
- Compile src:
sbt c;
- Compile test:
sbt ct;
- Format:
sbt styleFix;
- Test:
sbt t;
- Code coverage:
sbt runCoverage;
Right now, there is only an integration test available for Apache Flink. Before you run the integration test, a docker
environment must be available in your computer. The integration test run on the library TestContainers
so there
is no need to do anything else on docker (other than having an environment available).
sbt flinkIT;
The modules use kafka topics to communicate among them. Thus, the kafka consumer must be running on Docker. Other than that, the rest of the Application can be executed using the sbt command line.
- Start the kafka consumer
docker-compose -f ./docker/docker-compose-kafka.yml up -d;
- Start the Data Generator
sbt generateData;
- Execute Apache Flink or Spark
sbt runFlink;
sbt runSpark;
- Stop Docker:
docker-compose -f ./docker/docker-compose-kafka.yml down;
There are two custom images in this project: data-generator
and processor-spark
. These images have to be generated.
If you have already generated the images, you can skip to the section Start the flow of events
sbt data-generator/assembly; data-generator/docker:publishLocal;
sbt processor-spark/c;
The Spark image is build from a Dockerfile.
docker build ./03-o-processor-spark/docker/ -t cluster-apache-spark:3.4.1;
The first thing that is needed is to have some events coming through the kafka topics.
docker-compose \
-f ./docker/docker-compose-generator.yml \
-f ./docker/docker-compose-kafka.yml \
up -d
Composing up both images together will trigger the data-generator
autonomously. Before lunching the app, inspect Kafka
topics and verify that there are the following topics:
kcat -b localhost:9092 -L
You should see something like this:
Metadata for all topics (from broker 1: localhost:9092/1):
1 brokers:
broker 1 at localhost:9092 (controller)
4 topics:
topic "data-generator-gps" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
topic "data-generator-pp" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
topic "_schemas" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
topic "__consumer_offsets" with 50 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
[...]
- Init the Spark Cluster
docker-compose \
-f ./docker/docker-compose-spark.yml \
up -d
- Generates Spark jar file
sbt "processor-spark / assembly;"
- Copy the fat jar from the folder app-jar into the running container (master)
docker cp "./04-o-processor-spark/target/scala-3.3.0/spark-app.jar" "docker-spark-master-1:/opt/spark/app-jar"
- Execute Spark via spark-submit
The spark-submit uses the default 'client' deploy mode here, so the driver will run on the master node and the job's
stdout will be printed to the terminal console. You can also do --deploy-mode cluster
, which will cause the driver
to run on one of the worker nodes. For experimentation, client mode is slightly more convenient.
docker exec docker-spark-master-1 /opt/spark/bin/spark-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.13:3.4.1 \
--master spark://spark:7077 \
--deploy-mode client \
--driver-memory 1G \
--executor-memory 2G \
--total-executor-cores 2 \
--class com.fortyseven.processor.spark.run \
app-jar/spark-app.jar
Once you have finished running the app, stop docker
docker-compose \
-f ./docker/docker-compose-generator.yml \
-f ./docker/docker-compose-kafka.yml \
-f ./docker/docker-compose-spark.yml \
down
More information at Relationship with Scala 2 Implicits.
The scala.compiletime package contains helper definitions that provide support for compile-time operations over values.
The project uses two methods from the package scala.compiletime:
- error
- requireConst
The error method is used to produce user-defined compile errors during inline expansion. If an inline expansion results in a call error(msgStr) the compiler produces an error message containing the given msgStr.
The requireConst method checks at compiletime that the provided values is a constant after inlining and constant folding.
These two methods are used together in the apply method for refined type Latitude
.
import scala.compiletime.{error, requireConst}
opaque type Latitude = Double
object Latitude:
inline def apply(coordinate: Double): Latitude =
requireConst(coordinate)
inline if coordinate < -90.0 || coordinate > 90.0
then error("Invalid latitude value. Accepted coordinate values are between -90.0 and 90.0.")
else coordinate
end Latitude
val latOk: Latitude = Latitude(-3) // Compiles fine.
val latKo: Latitude = Latitude(91) // Won't compile and will display the error message.
More information at Compile-time operations.
A context bound is a shorthand for expressing the common pattern of a context parameter that depends on a type parameter.
Examples in module 03-u-data-generator
:
final class DataGenerator[F[_]: Async: Parallel]
final class ModelGenerators[F[_]: Temporal]
Examples in module 04-o-processor-flink
:
final class FlinkDataProcessor[F[_]: Applicative]
final class FlinkProcessor[F[_]: Async]
More information at Context Bounds.
Beware: The compiler expands enums and their cases to code that only uses Scala's other language features. As such, enums in Scala are convenient syntactic sugar, but they are not essential to understand Scala's core.
An enumeration is used to define a type consisting of a set of named values.
enum KafkaCompressionType:
case none, gzip, snappy, lz4, zstd
The companion object of an enum also defines three utility methods. The
valueOf
method obtains an enum value by its name. Thevalues
method returns all enum values defined in an enumeration in an Array. The `fromOrdinal method obtains an enum value from its ordinal (Int) value.
scala> KafkaCompressionType.valueOf("lz4")
val res: com.fortyseven.common.configuration.refinedTypes.KafkaCompressionType = lz4
scala> KafkaCompressionType.valueOf("lz5")
//java.lang.IllegalArgumentException: enum case not found: lz5
//at com.fortyseven.common.configuration.refinedTypes$KafkaCompressionType$.valueOf(refinedTypes.scala:47)
scala> KafkaCompressionType.values
val res: Array[com.fortyseven.common.configuration.refinedTypes.KafkaCompressionType] = Array(none, gzip, snappy, lz4, zstd)
scala> KafkaCompressionType.fromOrdinal(4)
val res: com.fortyseven.common.configuration.refinedTypes.KafkaCompressionType = zstd
scala> KafkaCompressionType.fromOrdinal(5)
//java.util.NoSuchElementException: 5
//at com.fortyseven.common.configuration.refinedTypes$KafkaCompressionType$.fromOrdinal(refinedTypes.scala:47)
More information at Enums.
Extension methods allow one to add methods to a type after the type is defined.
opaque type Latitude = Double
extension (coordinate: Latitude) def value: Double = coordinate
scala> Latitude(90)
val res: com.fortyseven.domain.model.types.refinedTypes.Latitude = 90.0
scala> Latitude(90).value
val res: Double = 90.0
More information at Extension Methods.
Given instances (or, simply, "givens") define "canonical" values of certain types that serve for synthesizing arguments to context parameters.
The name of a given can be left out. If the name of a given is missing, the compiler will synthesize a name from the implemented type(s).
In this project, givens are defined to fill the canonical value of a method that has a using clause in its parameters. For example:
- Codecs:
import vulcan.{AvroError, Codec}
given latitudeCodec: Codec[Latitude] = Codec.double.imapError(Latitude.from(_)
.leftMap(e => AvroError(s"AvroError: ${e.message}")))(_.value)
- ConfigReader:
import pureconfig.ConfigReader
import pureconfig.error.ExceptionThrown
given ConfigReader[KafkaCompressionType] =
ConfigReader.fromString(KafkaCompressionType.from(_).leftMap(ExceptionThrown.apply))
More information at Given Instances.
Implicit conversions are defined by given instances of the
scala.Conversion
class.
This given Conversion is inside the companion object of the refined type NonEmptyString
. Compiler will convert the type
NonEmptyString
to String
everytime that a method expects to receive a String
but a NonEmptyString
is provided.
given Conversion[NonEmptyString, String] with
override def apply(x: NonEmptyString): String = x
This snippet of code is from of kafka-consumer
. Here we can see that a given instance of the type
Conversion[KafkaAutoOffsetReset,AutoOffsetReset]
is in scope when the compiler finds a method that requires the second
type (AutoOffsetReset), but it is provided with the first type (KafkaAutoOffsetReset).
import com.fortyseven.common.configuration.refinedTypes.KafkaAutoOffsetReset // Our refined type
import fs2.kafka.AutoOffsetReset // Actual type of Kafka's API
given Conversion[KafkaAutoOffsetReset, AutoOffsetReset] with
override def apply(x: KafkaAutoOffsetReset): AutoOffsetReset = x match // Match is exhaustive
case KafkaAutoOffsetReset.Earliest => AutoOffsetReset.Earliest
case KafkaAutoOffsetReset.Latest => AutoOffsetReset.Latest
case KafkaAutoOffsetReset.None => AutoOffsetReset.None
val consumerSettings = ConsumerSettings[F, String, Array[Byte]]
.withAutoOffsetReset(consumerConfig.autoOffsetReset) // Receives a KafkaAutoOffsetReset but expects AutoOffsetReset. Compiles
.withBootstrapServers(kc.broker.brokerAddress)
.withGroupId(consumerConfig.groupId)
More information at Implicit Conversions.
inline
is a new soft modifier that guarantees that a definition will be inlined at the point of use.
Inlining is used only with def
(methods) but can be used also with val
(values).
inline def apply(coordinate: Double): Latitude =
requireConst(coordinate)
inline if coordinate < -90.0 || coordinate > 90.0
then error("Invalid latitude value. Accepted coordinate values are between -90.0 and 90.0.")
else coordinate
This method will always be inlined at the point of call. In the inlined code, an if-then-else with a constant condition will be rewritten to its then- or else-part.
More information at Inline.
Scala 3 has a new "quiet" syntax for control expressions that does not rely on enclosing the condition in parentheses, and also allows to drop parentheses or braces around the generators of a
for
-expression.
- Quiet syntax:
def from(intCandidate: Int): Either[Throwable, PositiveInt] =
if intCandidate < 0
then Left(new IllegalStateException(s"The provided int $intCandidate is not positive."))
else Right(intCandidate)
More information at New Control Syntax.
Opaque types aliases provide type abstraction without any overhead.
object ids:
opaque type BicycleId = UUID
object BicycleId:
def apply(id: UUID): BicycleId = id
extension (bicycleId: BicycleId)
def value: UUID = bicycleId
This introduces BicycleId
as a new abstract type, which is implemented as UUID
. The fact that BicycleId is the same
as UUID is only known in the scope where BicycleId is defined, which in the above example corresponds to the object
ids
. Or in other words, within the scope, it is treated as a type alias, but this is opaque to the outside world
where, in consequence, BicycleId is seen as an abstract type that has nothing to do with UUID.
The public API of BicycleId consists of the apply
method defined in the companion object. It converts from UUID
to
BicycleId
values. Moreover, an operation value
that converts the other way is defined as extension method
on BicycleId
values.
More information at Opaque Type Aliases.
Functional programming tends to express most dependencies as simple function parameterization. This is clean and powerful, but it sometimes leads to functions that take many parameters where the same value is passed over and over again in long call chains to many functions. Context parameters can help here since they enable the compiler to synthesize repetitive arguments instead of the programmer having to write them explicitly.
Using clauses are present in the module 03-u-data-generator
, where they are used to handle generic serializers.
def avroSerializer[T](configuration: Configuration, includeKey: Boolean)
(using codec: Codec[T]): Serializer[T] =
new Serializer[T]: // More code here
import com.fortyseven.domain.codecs.iot.IotCodecs.given // Codec[GPSPosition] is here
val gpsPositionSerializer = avroSerializer[GPSPosition](
Configuration(configuration.schemaRegistry.schemaRegistryUrl),
includeKey = false
)
More information at Using Clauses.
//TODO this section is not clear yetIf we were to combine some of the prior new features of Scala 3, we could build natively refined types and their APIs.
The first example combines the following features:
- compile-time operations
- given instances
- implicit conversions
- inline
- new control syntax
- opaque types aliases
opaque type PositiveInt = Int
object PositiveInt:
def from(intCandidate: Int): Either[Throwable, PositiveInt] =
if intCandidate < 0
then Left(new IllegalStateException(s"The provided int $intCandidate is not positive."))
else Right(intCandidate)
inline def apply(int: Int): PositiveInt =
requireConst(int)
inline if int < 0
then error("Int must be positive.")
else int
given Conversion[PositiveInt, Int] with
override def apply(x: PositiveInt): Int = x
And with this small amount of code we get all this functionality:
scala> PositiveInt.from(-1)
val res: Either[Throwable, com.fortyseven.common.configuration.refinedTypes.PositiveInt] =
Left(java.lang.IllegalStateException: The provided int -1 is not positive.)
scala> PositiveInt.from(0)
val res: Either[Throwable, com.fortyseven.common.configuration.refinedTypes.PositiveInt] = Right(0)
scala> PositiveInt(0) // PositiveInt.apply(0) checks during compilation time
val res: com.fortyseven.common.configuration.refinedTypes.PositiveInt = 0
scala> PositiveInt(-1) // PositiveInt.apply(-1) checks during compilation time
-- Error: ----------------------------------------------------------------------
1 |PositiveInt(-1)
|^^^^^^^^^^^^^^^
|Int must be positive.
scala> PositiveInt(0)
val res: com.fortyseven.common.configuration.refinedTypes.PositiveInt = 0
scala> PositiveInt(0) + 1 // Implicit conversion
val res: Int = 1
scala> 1 + PositiveInt(0) // Implicit conversion
val res: Int = 1
The second example combines the following features:
- compile-time operations
- enum
- inline
- new control syntax
enum KafkaAutoOffsetReset:
case Earliest, Latest, None
object KafkaAutoOffsetReset:
def from(kafkaAutoOffsetResetCandidate: String): Either[Throwable, KafkaAutoOffsetReset] =
Try(valueOf(kafkaAutoOffsetResetCandidate)).toEither
inline def apply(kafkaAutoOffsetReset: String): KafkaAutoOffsetReset =
requireConst(kafkaAutoOffsetReset)
inline if values.map(_.toString).contains(kafkaAutoOffsetReset) // This inline fails in the console. Needs checking
then valueOf(kafkaAutoOffsetReset)
else error("The valid values are Earliest, Latest and None.")
And this is what we get:
scala> KafkaAutoOffsetReset.values
val res: Array[com.fortyseven.common.configuration.refinedTypes.KafkaAutoOffsetReset] = Array(Earliest, Latest, None)
scala> KafkaAutoOffsetReset.valueOf("None")
val res: com.fortyseven.common.configuration.refinedTypes.KafkaAutoOffsetReset = None
scala> KafkaAutoOffsetReset.from("None")
val res: Either[Throwable, com.fortyseven.common.configuration.refinedTypes.KafkaAutoOffsetReset] = Right(None)
There are three main frameworks used in the project:
- Apache Flink
- Apache Kafka
- Apache Spark
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
More information at Apache Flink.
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
In this project, Kafka is used as a event bus that connects multiple pieces of the pipeline. One of the goals of the project is to process the same event by multiple processors (right now Flink and Spark).
More information at Apache Kafka.
Apache Spark™ is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters.
In this project, Spark can be executed using sbt (beware of the flag --add-opens) or docker.
More information at Apache Spark.
Apache Avro™ is the leading serialization format for record data, and first choice for streaming data pipelines. It offers excellent schema evolution.
In this project, it is implemented with Vulcan.
More information at Apache Avro.
Cats is a library which provides abstractions for functional programming in the Scala programming language.
Cats strives to provide functional programming abstractions that are core, binary compatible, modular, approachable and efficient. A broader goal of Cats is to provide a foundation for an ecosystem of pure, typeful libraries to support functional programming in Scala applications.
Cats is the effect system used in the whole project except for the module processor-spark
. Spark's internal
way of working does not benefit from an effect system on top of it.
More information at Typelevel Cats.
Functional, lightweight, and composable configuration loading for Scala.
It is used in the module ciris
.
More information at Ciris.
Functional, effectful, concurrent streams for Scala.
It is used in the module data-generator
, kafka-consumer
and processor-flink
.
More information at FS2.
Logback is intended as a successor to the popular log4j project, picking up where log4j 1.x leaves off.
Logback's architecture is quite generic so as to apply under different circumstances. At present time, logback is divided into three modules, logback-core, logback-classic and logback-access. This project only uses logback-classic.
The logback-core module lays the groundwork for the other two modules. The logback-classic module can be assimilated to a significantly improved version of log4j 1.x. Moreover, logback-classic natively implements the SLF4J API so that you can readily switch back and forth between logback and other logging frameworks such as log4j 1.x or java.util.logging (JUL).
It is used in the module main
and logs the underlying activity of the modules involved in the execution of the program.
More information at QOS Logback.
Scala testing library with actionable errors and extensible APIs.
This project uses two modules of munit that are compatible with cats
and with scala-check
.
- Cats: https://github.com/typelevel/munit-cats-effect
- Scala Check: https://scalameta.org/munit/docs/integrations/scalacheck.html
More information at Scalameta Munit.
PureConfig is a Scala library for loading configuration files. It reads Typesafe Config configurations written in HOCON, Java .properties, or JSON to native Scala classes in a boilerplate-free way. Sealed traits, case classes, collections, optional values, and many other types are all supported out-of-the-box. Users also have many ways to add support for custom types or customize existing ones.
In this project, both automatic derivation and custom types are used. Fin the implementations in module pureconfig
.
More information at PureConfig.
Scala wrapper for testcontainers-java that allows using docker containers for functional/integration/unit testing.
TestContainers is a Java 8 library that supports JUnit tests, providing lightweight, throwaway instances of common databases, Selenium web browsers, or anything else that can run in a Docker container.
In this project it is used for the integration testing of processor-flink
.
More information at Testcontainers-scala.
Functional Avro encodings for Scala using the official Apache Avro library.
It is used to encode and decode the events between the data-generator
, the kafka-consumer
and processor-flink
.
More information at FD4S Vulcan.
Sbt plugin originally ported from codahale's assembly-sbt, which I'm guessing was inspired by Maven's assembly plugin. The goal is simple: Create a über JAR of your project with all of its dependencies.
More information at sbt-assembly.
Sbt plugin to check that your libraryDependencies accurately reflects the libraries that your code depends on in order to compile.
More information at sbt-explicit-dependencies.
Refactoring and linting tool for Scala.
More information at Scala Center.
Code formatter for Scala.
More information at Scalameta Scalafmt.
Apache-2.0
, see LICENSE