Demonstration von Kafka 2.1 anhand einiger (Spring Boot basierter) Beispielanwendungen.
Die Demonstration zeigt:
-
Kafka Producer
-
Kafka Consumer
-
Kafka Streams
-
Avro (als Datenübertragungsformat innerhalb Kafka)
-
Schema Registry (zur Veröffentlichung der Avro-Schemata)
-
KSQL (SQL-basiertes Stream Processing via Kommandozeile)
Voraussetzungen:
-
Java JDK 11 (oder höher)
-
Docker / Docker Compose
Eine Docker-Installation vorausgesetzt, können Sie Kafka und die weiteren benötigten Dienste (Zookeeper, Schema Registry etc.) mit der docker-compose.yml
-Datei vom docker
-Verzeichnis aus starten:
kafka-demo/docker$ docker-compose up --build -d
Kafka und die zugehörigen Dienste werden beendet mit dem Befehl (ebenfalls im docker
-Verzeichnis auszuführen):
kafka-demo/docker$ docker-compose down
Das Projekt wird mit Maven (d.h. dem Maven-Wrapper) folgendendermaßen gebaut:
kafka-demo$ mvnw install
Unter Windows ist statt mvnw
jeweils mvnw.cmd
zu verwenden. Dies gilt auch für alle nachfolgend gezeigten Maven-Kommandos.
Fachlich drehen sich die Demoanwendungen um das Thema "Logging". Sie beschreiben ein Szenario, in dem alle IT-Systeme eines Unternehmens ihre Lognachrichten in das Topic logging
eines Kafka-Clusters schreiben, z.B. mittels eines KafkaAppenders
.
Letzterer ist Teil des Logging-Frameworks und spielt in dieser Demo keine Rolle.
Stattdessen werden Lognachrichten im Modul producer
zufällig erzeugt und über einen Kafka-Producer nach Kafka geschrieben.
Für die Datenübertragung der Lognachrichten in Kafka wird das Avro-Schema LogMessage
aus dem Modul model
verwendet.
Dort definiert das Avro-Schema log_message.avsc
die Datenstruktur für Lognachrichten.
Während des Kompiliervorangs des Moduls model
wird aus dem Schema die Klasse de.doubleslash.demo.kafka.avro.LogMessage
generiert.
Diese landet im Verzeichnis kafka-demo/model/target/generated-sources/avro
.
Eine Java-Anwendung verarbeitet die Lognachrichten mithilfe der Kafka-Streams-API weiter. Jede Lognachricht bekommt vom LogMessageProcessor
eine UUID. Anschließend werden alle Nachrichten mit Log-Level ERROR in das Kafka-Topic logging-alerts
geschrieben. Alle anderen Lognachrichten landen im Topic logging-processed
.
Damit die Anwendungen auf den Kafka-Broker und die Schema-Registry im Docker-Container zugreifen können muss einmalig ein Eintrag in der /etc/hosts
hinzugefügt werden (unter Windos: C:\Windows\System32\drivers\etc\hosts
, Bearbeitung im Administrator-Modus). Folgender Eintrag gilt für Linux sowie Doker Desktop unter Windows 10:
# Kafka-Demo (Docker)
127.0.0.1 kafka
Wer noch mit Windows 7 und der Docker Toolbox arbeitet, muss die IP-Adresse der Docker-VM angeben. In der Regel ist dies:
# Kafka-Demo (Docker)
192.168.99.100 kafka
Die Anwendung KafkaTemplateProducerDemoApp
schreibt zufällig generierte Nachrichten in das Kafka-Topic kafka-demo
.
Sie nutzt dafür das KafkaTemplate
, das Teil der Kafka-Spring-Integration spring-kafka
ist.
Führen Sie im producer-demo
-Verzeichnis den folgenden Befehl aus:
kafka-demo/producer-demo$ ../mvnw spring-boot:run
Die KafkaSpringConsumerDemoApp
im Modul consumer
zeigt, mit wie wenig Code man für das Konsumieren von Nachrichten auskommt, wenn man die Spring-Integration für Kafka (hier: @KafkaListener
) nutzt.
Die Anwendung liest die Nachrichten aus dem Topic logging-demo
und loggt diese aus.
Führen Sie im consumer
-Verzeichnis den folgenden Befehl aus:
kafka-demo/consumer$ ../mvnw spring-boot:run
Die Spring-Boot-Anwendung KafkaLogMessageProducerDemoApp
im Modul producer-logging
erzeugt zufällige Lognachrichten und schreibt diese nach Kafka in das Topic: logging
.
Die Spring-Boot-Anwendung KafkaStreamsDemoApp
im Modul streams
verarbeitet die Lognachrichten aus dem Topic logging
mithilfe der Kafka-Streams-API.
Jede Lognachricht bekommt eine UUID.
Anschließend werden alle Nachrichten mit Log-Level ERROR in das Kafka-Topic logging-alerts
geschrieben.
Alle anderen Lognachrichten landen im Topic logging-processed
.
Diese Anwendung verwendet zu Demonstrationszwecken nicht die Spring-Integration; das Starten und Stoppen des Streams geschieht über explizite Methodenaufrufe in der Anwendungsklasse, gemäß der Implementierung in Nicht-Spring-Anwendungen.
Führen Sie im streams
-Verzeichnis den folgenden Befehl aus:
kafka-demo/streams$ ../mvnw spring-boot:run
Die Anwendung KafkaStreamsTableDemoApp
im Modul streams-table
zeigt zum einen die Verwendung einer Tabelle (KTable
), und zum anderen das Auslesen der Daten aus dem Store, welcher der Tabelle zugrunde liegt.
Die Applikation verarbeitet ebenfalls die Nachrichten aus dem Topic logging
.
In einer Tabelle wird die Anzahl aufgetretener Lognachrichten je Loglevel folgendermaßen gezählt:
Loglevel | Anzahl Nachrichten |
---|---|
DEBUG |
12747 |
ERROR |
8465 |
INFO |
147859 |
WARN |
42286 |
Die Tabelle mit den Zählerstände wird zudem wieder in einen Stream umgewandelt, der in das Kafka-Topic logging-counts
schreibt.
Der LogMessageCounterRestController
liest die Daten aus dem Store und liefert die jeweils aktuellen Zählerstände im JSON-Format zurück.
Der REST-Service liefert die Zählerdaten über die URL http://localhost:8080/logging/counts
zurück.
Über die URL http://localhost:8080/logging/store
kann man sich die Informationen zum Store ausgeben lassen.
Im Gegensatz zum Beispiel im Modul streams
wird hier Kafka Streams mit Spring-Integration verwendet.
Das Vorhandensein einer @Bean
vom Typ KStream
und entsprechender Konfiguration ist bereits ausreichend; das Starten und Stoppen/Aufräumen des Streams geschieht automatisch über den Lebenszyklus der Spring Boot Application.
Wichtig:
In einem echten System hätte man mehrere Instanzen der Streaming-Anwendung.
Da der Store hinter der KTable
lokal ist, gibt der REST-Service lediglich die Zählerstände der in dieser Instanz verarbeiteten Nachrichten zurück.
Wollte man bei mehreren Instanzen die Summe der verarbeiteten Nachrichten insgesamt haben, müsste man alle Instanzen anfragen und die Ergebnisse anschließend aufsummieren.
Um die nach Kafka geschriebenen Lognachrichten zu sehen können Sie wie folgt vorgehen:
Kafka Console-Consumer (Docker-Container)
Sie öffnen eine Bash im Kafka-Container und lassen sich die Nachrichten eines Topics wie folgt ausgeben:
kafka-demo/producer$ docker exec -it kafka bash
root@kafka:/# /usr/bin/kafka-console-consumer --bootstrap-server kafka:9092 \
--topic logging --from-beginning
Gleichermaßen können Sie sich die Inhalte der anderen Topics anzeigen lassen, indem Sie für den Parameter --topic
statt logging
das Topic logging-processed
oder logging-alerts
angeben.
Kafka Topics UI
Alternativ öffnen Sie in einem Browser die URL http://kafka:8000/
. Wählen Sie das gewünschte Topic, um dessen Inhalt zu inspizieren.
Als Datenübertragungsformat für die Lognachrichten in Kafka wird Avro verwendet. Bei Nutzung von Avro ist ein Schema für die zu übertragenden Datentypen zwingend erforderlich.
Dieses Schema wird sowohl für die Serialisierung beim Schreiben nach Kafka, als auch für die Deserialisierung beim Lesen aus Kafka benötigt. Der Kafka-Producer schreibt das Schema, sofern dort noch nicht vorhanden, in die Schema-Registry. Von dort holen sich die Consumer, wie z.B. unsere Streams-Demoapplikation das Schema, damit sie wissen wie die Daten deserialisiert werden müssen.
Das Ganze geschieht vollautomatisch; man braucht sich nicht darum zu kümmern - lediglich in der Konfiguration der Anwendung muss die URL zur Schema Registry konfiguriert sein.
Die in der Schema Registry registrierten Schemata können Sie sich in der Schema-Registry-UI anschauen. Dazu öffnen Sie in einem Browser die URL http://kafka:8001/.
Die folgenden Kommandos zeigen, wie man mit KSQL auf der Kommandozeile in einer SQL-ähnlichen Syntax, ganz ohne Programmierung, Daten aus einem Kafka-Topic verarbeiten kann.
Die von der KafkaLogMessageProducerDemoApp
produzierten Loganachrichten werden aus dem Topic logging
gelesen.
Die Lognachrichten mit Loglevel ERROR werden dann in ein neues Topic namens ERROR_LOGS
geschrieben.
Voraussetzungen
-
docker-compose
muss mit den Dienstenkafka
,ksqldb-server
undksqldb-cli
gestartet sein. -
Die
KafkaLogMessageProducerDemoApp
muss laufen.
Hinweis
Da KSQL alle Eingaben als Großschreibung interpretiert, müssen Namen von Kafka-Topics, die Kleinbuchstaben enthalten in Anführungszeichen (single quotes '
) stehen.
-- Eine bash im `ksqldb-cli` Dockercontainer öffnen
$ docker exec -it ksqldb-cli bash
-- KSQL Command Line Interface starten
$ ksql http://ksqldb-server:8088
-- Topic 'logging' ausgeben (abbrechen mit Strg+C)
ksql> print logging;
-- Input-Stream erzeugen der aus Topic 'logging' liest
ksql> create stream logstream with (kafka_topic='logging', value_format='AVRO');
-- Stream erzeugen, der nur Lognachrichten mit Loglevel ERROR enthält
ksql> create stream error_logs as select loglevel, message from logstream where loglevel='ERROR';
-- Inhalt des Streams ausgeben (abbrechen mit Strg+C)
ksql> select * from error_logs emit changes;
-- Kafka-Topics anzeigen lassen => Neues Topic ERROR_LOGS wurde angelegt
ksql> show topics;
-- Inhalt des Topics ERROR_LOGS ausgeben lassen (abbrechen mit Strg+C)
ksql> print ERROR_LOGS;
Nun kann man sich die Nachrichten des neuen Topics ERROR_LOGS
auch mit dem kafka-console-consumer
ausgeben lassen (s.o.).