Skip to content

hifly81/kafka-examples

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Practical examples with Apache Kafka®

Table of Contents

General Information

This repository contains various examples designed to demonstrate the functionality of Apache Kafka®. The examples are mostly extracted from more complex projects and should not be considered complete or ready to be used in a production environment, unless after a serious refactoring and test work.

Export Documentation

Clone this repository:

git clone git@github.com:hifly81/kafka-examples.git

Install asciidoctor:

Linux:

gem install asciidoctor --pre

Mac:

brew install asciidoctor

Export documentation:

# PDF
asciidoctor-pdf README.adoc

# HTML
asciidoctor README.html

License

Installation with Docker

Official documentation on how to install Docker on Linux/Mac/Windows is available at this link: https://docs.docker.com/engine/install/

Prerequisites

List of software required on your local machine to run the examples:

  • curl

  • wget

  • openssl

  • Java SE 17 or 21

  • keytool from Java distribution

  • Apache Maven 3.x

  • Go Programming language (for proxy example)

  • Python (for python clients)

Docker Images Version

Default image version for required components is listed in file .env

If you needed to change the docker image version for the specific components, just update file .env.

Docker Images details

Apache Kafka® docker images are downloaded from Docker Hub apache/kafka and are based on Apache Kafka® version 3.8.x).

Single-node cluster

To run a single-node cluster (KRaft controller and Broker node combined) using Docker, run the docker-compose.yml file available in the root directory. It also contains a container with kcat:

  • broker: apache/kafka, listening on port 9092

  • kcat: confluentinc/cp-kcat

Start

scripts/bootstrap.sh

Stop

scripts/tear-down.sh

Multi nodes cluster

To run a cluster with 3 KRaft controller and 3 Broker nodes using Docker on different containers, use the file in apache-kafka github repository: https://raw.githubusercontent.com/apache/kafka/trunk/docker/examples/jvm/cluster/isolated/plaintext/docker-compose.yml

  • kafka-1: apache/kafka, listening on port 29092

  • kafka-2: apache/kafka, listening on port 39092

  • kafka-3: apache/kafka, listening on port 49092

Start

scripts/bootstrap-isolated.sh

Stop

scripts/tear-down-isolated.sh

Installation on Kubernetes using Confluent For Kubernetes

Prerequisites

List of software required on your local machine to run the examples:

Minikube installation in Linux with KVM/QEMU

Follow instructions for ArchLinux (also tested with Fedora)

Minikube installation in Mac/Windows

Deployment of a cluster with Confluent for Kubernetes Operator (CFK)

Start Minikube with kvm2 driver (Linux):

minikube delete
minikube config set driver kvm2

Start Minikube with docker driver (Mac):

minikube delete
minikube config set driver docker
touch /tmp/config && export KUBECONFIG=/tmp/config
minikube start --memory 16384 --cpus 4

Create a k8s namespace named confluent:

kubectl create namespace confluent
kubectl config set-context --current --namespace confluent

Add confluent repository to helm:

helm repo add confluentinc https://packages.confluent.io/helm
helm repo update

Install confluent-for-kubernetes operator (latest version) from Confluent’s Helm repo:

helm upgrade --install confluent-operator confluentinc/confluent-for-kubernetes --set kRaftEnabled=true

Deploy Confluent components

1 controller, 3 brokers:

kubectl apply -f confluent-for-kubernetes/k8s/confluent-platform-reducted.yaml

List pods:

kubectl get pods

NAME                                  READY   STATUS    RESTARTS   AGE
confluent-operator-665db446b7-j52rj   1/1     Running   0          6m35s
kafka-0                               1/1     Running   0          65s
kafka-1                               1/1     Running   0          65s
kafka-2                               1/1     Running   0          65s
kraftcontroller-0                     1/1     Running   0          5m5s

Verify events and pods:

watch -n 5 "kubectl get events --sort-by='.lastTimestamp'"
watch -n 5 "kubectl get pods"

alternately, you can install additional Confluent components: 1 controller, 3 brokers, 1 connect, 1 ksqldb, 1 schema registry, 1 rest proxy:

kubectl apply -f confluent-for-kubernetes/k8s/confluent-platform.yaml

Apache Kafka® Operations

Topic create:

kubectl exec --stdin --tty kafka-0 -- /bin/bash
kafka-topics --bootstrap-server localhost:9092 --create --topic test-1
exit

Topic list:

kubectl exec --stdin --tty kafka-0 -- /bin/bash
kafka-topics --bootstrap-server localhost:9092 --list
exit

Topic describe:

kubectl exec --stdin --tty kafka-0 -- /bin/bash
kafka-topics --bootstrap-server localhost:9092 --topic test-1 --describe
exit

Produce messages to Topic:

kubectl exec --stdin --tty kafka-0 -- /bin/bash
kafka-producer-perf-test --num-records 1000000 --record-size 1000 --throughput -1 --topic test-1 --producer-props bootstrap.servers=localhost:9092
exit

Consume messages from Topic:

kubectl exec --stdin --tty kafka-0 -- /bin/bash
kafka-console-consumer --bootstrap-server localhost:9092 --topic test-1 --from-beginning
exit

Tear Down

Shut down Confluent components and the data:

kubectl delete -f confluent-for-kubernetes/k8s/topic.yml
kubectl delete -f confluent-for-kubernetes/k8s/producer.yml
kubectl delete -f confluent-for-kubernetes/k8s/confluent-platform.yaml
helm delete confluent-operator

Delete namespace confluent:

kubectl delete namespace confluent

Delete minikube:

minikube delete

Apache Kafka® producers

Some implementations of Apache Kafka® producers.

Execute tests:

cd kafka-producer
mvn clean test

String Serializer

It uses org.apache.kafka.common.serialization.StringSerializer class for key and value serialization.

Create topic topic1:

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic topic1 --replication-factor 1 --partitions 1

Produce on topic topic1:

cd kafka-producer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.producer.serializer.string.Runner"

Json Serializer

Create topic test_custom_data:

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic test_custom_data --replication-factor 1 --partitions 1

Produce on topic test_custom_data:

cd kafka-producer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.producer.serializer.json.Runner"

Partitioner

It uses a custom partitioner for keys.

Messages with key Mark go to partition 1, with key Antony to partition 2 and with key Paul to partition 3.

Create topic demo-test with 3 partitions:

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic demo-test --replication-factor 1 --partitions 3

Produce on topic demo-test:

cd kafka-producer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.producer.partitioner.custom.Runner"

Create Timestamp vs LogAppend Timestamp

Create Timestamp

Message timestamp is set on headers when the message has been produced. This is the default behaviour, Create Timestamp.

Create topic topic1:

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic topic1 --replication-factor 1 --partitions 1

Consume from topic1 and print out the message timestamp:

docker exec -it broker /opt/kafka/bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server broker:9092 --from-beginning --property print.timestamp=true

Produce records on topic1:

docker exec broker /opt/kafka/bin/kafka-producer-perf-test.sh --topic topic1 --num-records 1000 --record-size 100 --throughput -1 --producer-props bootstrap.servers=broker:9092

Check consumer log for message timestamp:

CreateTime:1697359570614	YQHHNEBSEPDNSEIFGAMSUJXKOLTXSPLGHDIOYZJFNIDSPWHZMKVJAXDBZFCOXYKYRJOGYKDESSJMOIIOWVKYUAVWJLXSEPPFEILV
CreateTime:1697359570621	BASHCGRHSYGIFSYLVGRXCDVABWWTRQZTMMPBAXGHEPHTASSORYKGVPFGQYJKINSZUJLXQUUDVALUSBFRSXNQHSDFDBAKQZZNTYXF
CreateTime:1697359570621	HYGDPYGNRETYAXIXXYQKMKURDSJYIZNEDAHVIVHCJAPGOBQLHUZTKIWTVFEHVYPNGHIDSERMARFXCPYFEPQMFDOTDPWNKMYRMFIA
CreateTime:1697359570621	BIQAWWOIFIAKNYFEPTPMIXPQAXFEIKUFFXIDHILBPCBTHWDRMALHFNDCRHAYVLLMRCKJIPNPKGWCIWQCHNHSFSCTYSAKSLVZCCAI

LogAppend Timestamp

Message timestamp is set on headers when the record arrives at the broker, the broker will override the timestamp of the producer record with its own timestamp (the current time of the broker environment) as it appends the record to the log.

Create topic topic2 with message.timestamp.type=LogAppendTime:

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic topic2 --replication-factor 1 --partitions 1 --config message.timestamp.type=LogAppendTime

Consume from topic2 and print out the message timestamp:

docker exec -it broker /opt/kafka/bin/kafka-console-consumer.sh --topic topic2 --bootstrap-server broker:9092 --from-beginning --property print.timestamp=true

Produce records on topic2:

docker exec broker /opt/kafka/bin/kafka-producer-perf-test.sh --topic topic2 --num-records 1000 --record-size 100 --throughput -1 --producer-props bootstrap.servers=broker:9092

Check consumer log for message timestamp:

LogAppendTime:1697359857981	YQHHNEBSEPDNSEIFGAMSUJXKOLTXSPLGHDIOYZJFNIDSPWHZMKVJAXDBZFCOXYKYRJOGYKDESSJMOIIOWVKYUAVWJLXSEPPFEILV
LogAppendTime:1697359857981	BASHCGRHSYGIFSYLVGRXCDVABWWTRQZTMMPBAXGHEPHTASSORYKGVPFGQYJKINSZUJLXQUUDVALUSBFRSXNQHSDFDBAKQZZNTYXF
LogAppendTime:1697359857981	HYGDPYGNRETYAXIXXYQKMKURDSJYIZNEDAHVIVHCJAPGOBQLHUZTKIWTVFEHVYPNGHIDSERMARFXCPYFEPQMFDOTDPWNKMYRMFIA
LogAppendTime:1697359857981	BIQAWWOIFIAKNYFEPTPMIXPQAXFEIKUFFXIDHILBPCBTHWDRMALHFNDCRHAYVLLMRCKJIPNPKGWCIWQCHNHSFSCTYSAKSLVZCCAI

Interceptor

Folder interceptors/

This example shows how to create a custom producer interceptor. Java class CreditCardProducerInterceptor will mask a sensitive info on producer record (credit card number).

Compile and package:

cd interceptors
mvn clean package

Run a consumer:

mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.interceptor.consumer.Runner"

Run a producer:

mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.interceptor.producer.Runner"

Verify output:

record is:XXXXXX
Topic: test_custom_data - Partition: 0 - Offset: 1

Python Producer

Install confluent-kafka-python lib confluent-kafka:

pip install confluent-kafka

or:

python3 -m pip install confluent-kafka

Create kafka-topic topic:

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic kafka-topic --replication-factor 1 --partitions 1

Run producer:

cd kafka-python-producer
python producer.py

Apache Kafka® consumers

Implementation of a consumer that can be used with different deserializer classes (for key and value).

Java class ConsumerInstance can be customized with:

  • clientId (string)

  • groupId (string)

  • topics (string separated by comma)

  • key deserializer class (string)

  • value deserializer class (string)

  • partition assignment strategy (org.apache.kafka.clients.consumer.RangeAssignor|org.apache.kafka.clients.consumer.RoundRobinAssignor|org.apache.kafka.clients.consumer.StickyAssignor|org.apache.kafka.clients.consumer.CooperativeStickyAssignor)

  • isolation.level (read_uncommitted|read_committed)

  • poll timeout (ms)

  • consume duration (ms)

  • autoCommit (true|false)

  • commit sync (true|false)

  • subscribe mode (true|false)

Topics can be passed as argument 1 of the main program:

-Dexec.args="users,users_clicks"

Partition assignment strategy can be passed as argument 2 of the main program:

-Dexec.args="users,users_clicks org.apache.kafka.clients.consumer.RoundRobinAssignor"

Group id can be passed as argument 3 of the main program:

-Dexec.args="users,users_clicks org.apache.kafka.clients.consumer.RoundRobinAssignor group-1"

Execute tests:

cd kafka-consumer
mvn clean test

String Deserializer

It uses org.apache.kafka.common.serialization.StringDeserializer for key and value deserialization. Default topic is topic1.

cd kafka-consumer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.consumer.core.Runner"

Send messages to topic1:

docker exec -it broker /opt/kafka/bin/kafka-console-producer.sh --broker-list broker:9092 --topic topic1 --property "parse.key=true" --property "key.separator=:"
> Frank:1

Consumer Partition Assignor

Range (default)

Create 2 topics, users and users_clicks with the same number of partitions:

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic users --replication-factor 1 --partitions 3

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic users_clicks --replication-factor 1 --partitions 3

Run 2 consumer instances (2 different shells/terminals) belonging to the same consumer group and subscribed to user and user_clicks topics. Consumers uses org.apache.kafka.clients.consumer.RangeAssignor to distribute partition ownership.

mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.consumer.core.Runner" -Dexec.args="users,users_clicks org.apache.kafka.clients.consumer.RangeAssignor range-group-app"

Send messages to both topics using the same key (Frank):

docker exec -it broker /opt/kafka/bin/kafka-console-producer.sh --broker-list broker:9092 --topic users --property "parse.key=true" --property "key.separator=:"
> Frank:1

docker exec -it broker /opt/kafka/bin/kafka-console-producer.sh --broker-list broker:9092 --topic users_clicks --property "parse.key=true" --property "key.separator=:"
> Frank:1

Verify that the same consumer instance will read both messages.

Group id group-XX - Consumer id: consumer-group-XX-1-421db3e2-6501-45b1-acfd-275ce8d18368 - Topic: users - Partition: 1 - Offset: 0 - Key: frank - Value: 1
Group id group-XX - Consumer id: consumer-group-XX-1-421db3e2-6501-45b1-acfd-275ce8d18368 - Topic: users_clicks - Partition: 1 - Offset: 0 - Key: frank - Value: 1

Round Robin

Create 2 topics, users and users_clicks with same number of partitions:

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic users --replication-factor 1 --partitions 3

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic users_clicks --replication-factor 1 --partitions 3

Run 2 consumer instances (2 different shells/terminals) belonging to the same consumer group and subscribed to user and user_clicks topics; consumers uses org.apache.kafka.clients.consumer.RoundRobinAssignor to distribute partition ownership.

mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.consumer.core.Runner" -Dexec.args="users,users_clicks org.apache.kafka.clients.consumer.RoundRobinAssignor rr-group-app"

Send messages to both topics using the same key (Frank):

docker exec -it broker /opt/kafka/bin/kafka-console-producer.sh --broker-list broker:9092 --topic users --property "parse.key=true" --property "key.separator=:"
> Frank:1

docker exec -it broker /opt/kafka/bin/kafka-console-producer.sh --broker-list broker:9092 --topic users_clicks --property "parse.key=true" --property "key.separator=:"
> Frank:1

Verify that messages are read by different consumer instances.

Static membership

This example will show how to configure different consumer instances to use a unique group instance id and define a static membership for topic partitions.

After shutting down and then restarting the consumer instance, this will consume from the same partitions avoiding re-balancing.

Create topic topic1 with 12 partitions:

docker exec broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic topic1 --replication-factor 1 --partitions 12

Run 3 different consumer instances (from 3 different terminals) belonging to the same consumer group:

member1:

cd kafka-consumer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.consumer.staticmembership.Runner" -Dexec.args="consumer-member1.properties"

member2:

cd kafka-consumer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.consumer.staticmembership.Runner" -Dexec.args="consumer-member2.properties"

member3:

cd kafka-consumer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.consumer.staticmembership.Runner" -Dexec.args="consumer-member3.properties"

Run a producer perf test to send messages to topic1:

docker exec -it broker /opt/kafka/bin/kafka-producer-perf-test.sh --topic topic1 --num-records 10000 --throughput -1 --record-size 2000 --producer-props bootstrap.servers=broker:9092

Consumers will start reading messages from partitions (e.g.):

  • member1 (1,2,3,4)

  • member2 (5,6,7,8)

  • member3 (9,10,11,12)

Try to shut down consumer instances (CTRL+C) and then re-start them again; verify that re-balancing will not happen and consumers will always read from the same partitions.

Read from the closest replica

This example shows how to use the feature (since Apache Kafka® 2.4+) for consumers to read messages from the closest replica, even if it is not a leader of the partition.

Start a cluster with 3 brokers on 3 different racks, dc1, dc2 and dc3:

scripts/bootstrap-racks.sh

Create topic topic-regional and assign partition leaderships only on broker 1 and 3 (dc1 and dc3):

docker exec broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic topic-regional --replication-factor 3 --partitions 3
docker exec -it broker /opt/kafka/bin/kafka-reassign-partitions.sh --bootstrap-server broker:9092 --reassignment-json-file /tmp/assignment.json --execute

docker exec -it broker /opt/kafka/bin/kafka-leader-election.sh --bootstrap-server broker:9092 --topic topic-regional --election-type PREFERRED --partition 0

docker exec -it broker /opt/kafka/bin/kafka-leader-election.sh --bootstrap-server broker:9092 --topic topic-regional --election-type PREFERRED --partition 1

docker exec -it broker /opt/kafka/bin/kafka-leader-election.sh --bootstrap-server broker:9092 --topic topic-regional --election-type PREFERRED --partition 2

Verify partitions with topic describe command:

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --topic topic-regional --describe

Topic: topic-regional	TopicId: p-sy0qiQTtSTLTJSG7s7Ew	PartitionCount: 3	ReplicationFactor: 3	Configs:
	Topic: topic-regional	Partition: 0	Leader: 1	Replicas: 1,2,3	Isr: 2,3,1	Offline:
	Topic: topic-regional	Partition: 1	Leader: 3	Replicas: 3,2,1	Isr: 3,1,2	Offline:
	Topic: topic-regional	Partition: 2	Leader: 1	Replicas: 1,3,2	Isr: 1,2,3	Offline:

Run a consumer that will read messages from broker2 from rack dc2:

cd kafka-consumer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.consumer.rack.Runner"

Produce 50 messages:

docker exec -it broker /opt/kafka/bin/kafka-producer-perf-test.sh --topic topic-regional --num-records 50 --throughput 10 --record-size 1 --producer-props bootstrap.servers=broker:9092

Teardown:

scripts/tear-down-racks.sh

Consumers and retry topics

This solution could be implemented on consumer side to handle errors in processing records without blocking the input topic.

  1. Consumer processes records and commit the offset (auto-commit).

  2. If a record can’t be processed (simple condition here to raise an error, is the existence of a specific message HEADER named ERROR), it is sent to a retry topic, if the number of retries is not yet exhausted.

  3. When the number of retries is exhausted, record is sent to a DLQ topic.

  4. Number of retries is set at Consumer instance level.

Create topics retry-topic and dlq-topic:

docker exec broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic retry-topic --replication-factor 1 --partitions 1

docker exec broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic dlq-topic --replication-factor 1 --partitions 1

Run consumer managing retry topics:

cd kafka-consumer-retry-topics
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.consumer.retry.ConsumerRetries"

Send records:

docker exec kcat bash -c "echo 'alice,{"col_foo":1}'|kcat -b broker:9092 -t input-topic -P -K ,"

docker exec kcat bash -c "echo 'alice,{"col_foo":1}'|kcat -b broker:9092 -t input-topic -P -H ERROR=xxxxx -K ,"
docker exec kcat bash -c "echo 'alice,{"col_foo":1}'|kcat -b broker:9092 -t input-topic -P -H ERROR=xxxxx -K ,"
docker exec kcat bash -c "echo 'alice,{"col_foo":1}'|kcat -b broker:9092 -t input-topic -P -H ERROR=xxxxx -K ,"
docker exec kcat bash -c "echo 'alice,{"col_foo":1}'|kcat -b broker:9092 -t input-topic -P -H ERROR=xxxxx -K ,"

Verify in consumer log if messages are sent to retry and dlq topics:

Group id c9a19a62-0284-4251-be22-5d691243646a - Consumer id: consumer-c9a19a62-0284-4251-be22-5d691243646a-1-86fb972e-b5c8-4621-8464-9c1a747a920b - Topic: input-topic - Partition: 0 - Offset: 0 - Key: alice - Value: {col_foo:1}
Group id c9a19a62-0284-4251-be22-5d691243646a - Consumer id: consumer-c9a19a62-0284-4251-be22-5d691243646a-1-86fb972e-b5c8-4621-8464-9c1a747a920b - Topic: input-topic - Partition: 0 - Offset: 1 - Key: alice - Value: {col_foo:1}
Error message detected: number of retries 3 left for key alice
send to RETRY topic: retry-topic
Group id c9a19a62-0284-4251-be22-5d691243646a - Consumer id: consumer-c9a19a62-0284-4251-be22-5d691243646a-1-86fb972e-b5c8-4621-8464-9c1a747a920b - Topic: input-topic - Partition: 0 - Offset: 2 - Key: alice - Value: {col_foo:1}
Error message detected: number of retries 2 left for key alice
send to RETRY topic: retry-topic
Group id c9a19a62-0284-4251-be22-5d691243646a - Consumer id: consumer-c9a19a62-0284-4251-be22-5d691243646a-1-86fb972e-b5c8-4621-8464-9c1a747a920b - Topic: input-topic - Partition: 0 - Offset: 3 - Key: alice - Value: {col_foo:1}
Error message detected: number of retries 1 left for key alice
send to RETRY topic: retry-topic
Group id c9a19a62-0284-4251-be22-5d691243646a - Consumer id: consumer-c9a19a62-0284-4251-be22-5d691243646a-1-86fb972e-b5c8-4621-8464-9c1a747a920b - Topic: input-topic - Partition: 0 - Offset: 4 - Key: alice - Value: {col_foo:1}
Error message detected: number of retries 0 left for key alice
number of retries exhausted, send to DLQ topic: dlq-topic

Interceptor

Folder interceptors/

This example shows how to create a custom consumer interceptor. Java class CreditCardConsumerInterceptor will intercept records before deserialization and print headers.

Run a consumer:

mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.interceptor.consumer.Runner"

Run a producer:

cd interceptors
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.interceptor.producer.Runner"

Verify output:

record headers:RecordHeaders(headers = [], isReadOnly = false)
Group id consumer-interceptor-g2 - Consumer id: consumer-consumer-interceptor-g2-1-0e20b2b6-3269-4bc5-bfdb-ca787cf68aa8 - Topic: test_custom_data - Partition: 0 - Offset: 0 - Key: null - Value: XXXXXX
Consumer 23d06b51-5780-4efc-9c33-a93b3caa3b48 - partition 0 - lastOffset 1

Python Consumer

Install confluent kafka python lib confluent-kafka:

pip install confluent-kafka

Create topic kafka-topic:

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic kafka-topic --replication-factor 1 --partitions 1

Run producer:

cd kafka-python-producer
python producer.py

Run consumer:

cd kafka-python-consumer
python consumer.py

Admin & Management

Apache Kafka® CLI Tools

Apache Kafka® CLI are located in $KAFKA_HOME/bin directory.

Docker images provided are already shipped with CLI.

  1. kafka-acls - manage acls

  2. kafka-topics - create, delete, describe, or change a topic

  3. kafka-configs - create, delete, describe, or change cluster settings

  4. kafka-consumer-groups - manage consumer groups

  5. kafka-console-consumer - read data from topics and outputs it to standard output

  6. kafka-console-producer - produce data to topics

  7. kafka-consumer-perf-test - consume high volumes of data through your cluster

  8. kafka-producer-perf-test - produce high volumes of data through your cluster

  9. kafka-avro-console-producer - produce Avro data to topics with a schema (only with confluent installation)

  10. kafka-avro-console-consumer - read Avro data from topics with a schema and outputs it to standard output (only with confluent installation)

Topics: segments and retention

Create a topic cars with retention for old segments set to 5 minutes and size of segments set to 100 KB.

Be aware that log.retention.check.interval.ms is set by default to 5 minutes and this is the frequency in milliseconds that the log cleaner checks whether any log is eligible for deletion.

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic cars --replication-factor 1 --partitions 1 --config segment.bytes=100000 --config segment.ms=604800000 --config retention.ms=300000 --config retention.bytes=-1

Launch a producer performance session:

docker exec -it broker /opt/kafka/bin/kafka-producer-perf-test.sh --topic cars --num-records 99999999999999 --throughput -1 --record-size 1 --producer-props bootstrap.servers=broker:9092

Check the log dir for cars topic and wait for deletion of old segments (5 minutes + log cleaner trigger delta)

docker exec -it broker watch ls -ltr /tmp/kraft-combined-logs/cars-0/

Apache Kafka® Admin Client

Folder admin-client

It uses org.apache.kafka.clients.admin.AdminClient to execute Admin API.

Operations currently added:

  • list of cluster nodes

  • list topics

cd admin-client
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.admin.AdminClientWrapper" -Dexec.args="admin.properties"

Compression

Folder compression/

This example will show that messages sent to the same topic with different compression.type. Messages with different compression can be read by the same consumer instance.

Compressions supported on producer are:

  • none (no compression)

  • gzip

  • snappy

  • lz4

  • zstd

Send messages with different compression type and with batching disabled:

docker exec -it broker /opt/kafka/bin/kafka-console-producer.sh --broker-list broker:9092 --topic topic1 --producer.config compression/client-none.properties --property "parse.key=true" --property "key.separator=:"
0:none
docker exec -it broker /opt/kafka/bin/kafka-console-producer.sh --broker-list broker:9092 --topic topic1 --producer.config compression/client-gzip.properties --property "parse.key=true" --property "key.separator=:"
1:gzip
docker exec -it broker /opt/kafka/bin/kafka-console-producer.sh --broker-list broker:9092 --topic topic1 --producer.config compression/client-snappy.properties --property "parse.key=true" --property "key.separator=:"
2:snappy
docker exec -it broker /opt/kafka/bin/kafka-console-producer.sh --broker-list broker:9092 --topic topic1 --producer.config compression/client-lz4.properties --property "parse.key=true" --property "key.separator=:"
3:lz4
docker exec -it broker /opt/kafka/bin/kafka-console-producer.sh --broker-list broker:9092 --topic topic1 --producer.config compression/client-zstd.properties --property "parse.key=true" --property "key.separator=:"
4:zstd

Run a consumer on topic1 topic:

docker exec -it broker /opt/kafka/bin/kafka-console-consumer.sh --topic topic1 --bootstrap-server broker:9092 --from-beginning
none
gzip
snappy
lz4
zstd

Schema Registry

Confluent Avro Generic Record

It uses io.confluent.kafka.serializers.KafkaAvroSerializer for value serializer, sending an Avro GenericRecord.

Confluent Schema Registry is needed to run the example.

Avro schema car.avsc:

{
 "type": "record",
 "name": "Car",
 "namespace": "org.hifly.kafka.demo.producer.serializer.avro",
 "fields": [
  {
   "name": "model",
   "type": "string"
  },
  {
   "name": "brand",
   "type": "string"
  }
 ]
}

Start Confluent Schema Registry:

scripts/bootstrap-cflt-schema-registry.sh

Consume messages:

cd kafka-consumer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.consumer.deserializer.avro.Runner" -Dexec.args="CONFLUENT"

Produce messages:

cd kafka-producer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.producer.serializer.avro.Runner" -Dexec.args="CONFLUENT"

Teardown:

scripts/tear-down-cflt-schema-registry.sh

Apicurio Avro Generic Record

It uses io.apicurio.registry.utils.serde.AvroKafkaSerializer for value serializer, sending an Avro GenericRecord.

Apicurio Schema Registry is needed to run the example.

Avro schema car.avsc:

{
 "type": "record",
 "name": "Car",
 "namespace": "org.hifly.kafka.demo.producer.serializer.avro",
 "fields": [
  {
   "name": "model",
   "type": "string"
  },
  {
   "name": "brand",
   "type": "string"
  }
 ]
}

Start Apicurio:

scripts/bootstrap-apicurio.sh

Consume messages:

cd kafka-consumer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.consumer.deserializer.avro.Runner" -Dexec.args="APICURIO"

Produce messages:

cd kafka-producer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.producer.serializer.avro.Runner" -Dexec.args="APICURIO"

Teardown:

scripts/tear-down-apicurio.sh

Hortonworks Avro Generic Record

It uses com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer for value serializer, sending an Avro GenericRecord.

Hortonworks Schema Registry is needed to run the example.

Avro schema car.avsc:

{
 "type": "record",
 "name": "Car",
 "namespace": "org.hifly.kafka.demo.producer.serializer.avro",
 "fields": [
  {
   "name": "model",
   "type": "string"
  },
  {
   "name": "brand",
   "type": "string"
  }
 ]
}

Start Hortonworks Schema Registry:

scripts/bootstrap-hortonworks-sr.sh
cd kafka-producer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.producer.serializer.avro.Runner" -Dexec.args="HORTONWORKS"

Teardown:

scripts/tear-down-hortonworks-sr.sh

Confluent Avro Specific Record

Implementation of a producer and a consumer using Avro SpecificRecord for serializing and deserializing.

Confluent Schema Registry is needed to run the example.

scripts/bootstrap-cflt-schema-registry.sh

Create cars topic:

docker exec -it broker kafka-topics --bootstrap-server broker:9092 --create --topic cars --replication-factor 1 --partitions 1

Avro schema car_v1.avsc:

{"schema": "{\"type\": \"record\",\"name\": \"Car\",\"namespace\": \"org.hifly.kafka.demo.avro\",\"fields\": [{\"name\": \"model\",\"type\": \"string\"},{\"name\": \"brand\",\"type\": \"string\"}]}"}

Register first version of schema:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data @confluent-avro-specific-record/src/main/resources/car_v1.avsc \
http://localhost:8081/subjects/cars-value/versions

Run the consumer:

cd confluent-avro-specific-record
mvn clean compile package && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.avro.RunnerConsumer"

Run the producer:

cd confluent-avro-specific-record
mvn clean compile package && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.avro.RunnerProducer"

Teardown:

scripts/tear-down-cflt-schema-registry.sh

Confluent Schema Registry: Schema Evolution

Backward

Changes allowed:

  • Delete fields

  • Add optional fields

Confluent Schema Registry is needed to run the example.

scripts/bootstrap-cflt-schema-registry.sh

Create car topic:

docker exec -it broker kafka-topics --bootstrap-server broker:9092 --create --topic cars --replication-factor 1 --partitions 1

Avro schema car_v1.avsc:

{"schema": "{ \"type\": \"record\", \"name\": \"Car\", \"namespace\": \"org.hifly.kafka.demo.producer.serializer.avro\",\"fields\": [   {\"name\": \"model\",\"type\": \"string\"},{\"name\": \"brand\",\"type\": \"string\"}] }" }

Register a first version of schema:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data @avro/car_v1.avsc \
http://localhost:8081/subjects/cars-value/versions

Set compatibility on BACKWARD:

curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "BACKWARD"}' \
http://localhost:8081/config/cars-value

Verify compatibility for cars-value subject:

curl -X GET http://localhost:8081/config/cars-value

Run the producer:

cd confluent-avro-specific-record
mvn clean compile package && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.avro.RunnerProducer"

Run the consumer (don’t stop it):

cd confluent-avro-specific-record
mvn clean compile package && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.avro.RunnerConsumer"

View the latest schema for cars-value subject:

curl -X GET http://localhost:8081/subjects/cars-value/versions/latest | jq .

Avro schema car_v2.avsc:

{"schema": "{ \"type\": \"record\", \"name\": \"Car\", \"namespace\": \"org.hifly.kafka.demo.producer.serializer.avro\",\"fields\": [   {\"name\": \"engine\",\"type\": \"string\", \"default\":\"diesel\"}, {\"name\": \"model\",\"type\": \"string\"},{\"name\": \"brand\",\"type\": \"string\"}] }" }

Register a new version of schema, with the addition of a field with default value:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data @avro/car_v2.avsc \
http://localhost:8081/subjects/cars-value/versions

Produce data with using the new schema:

sh produce-avro-records.sh

Verify that consumer will not break and continue to process messages.

Avro schema car_v3.avsc:

{"schema": "{ \"type\": \"record\", \"name\": \"Car\", \"namespace\": \"org.hifly.kafka.demo.producer.serializer.avro\",\"fields\": [   {\"name\": \"engine\",\"type\": \"string\"}, {\"name\": \"model\",\"type\": \"string\"},{\"name\": \"brand\",\"type\": \"string\"}] }" }

Register a new version of schema, with the addition of a field with a required value:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data @avro/car_v3.avsc \
http://localhost:8081/subjects/cars-value/versions

you will get an error:

{"error_code":42201,"message":"Invalid schema

Teardown:

scripts/tear-down-cflt-schema-registry.sh

Confluent Schema Registry: Multiple Event Types in same Topic

This example shows how to use Avro unions with schema references.

In this example a topic named car-telemetry will be configured with a schema car-telemetry.avsc and will store different Avro messages:

  • car-info messages from schema car-info.avsc

  • car-telemetry messages from schema car-telemetry-data.avsc

[
  "org.hifly.kafka.demo.avro.references.CarInfo",
  "org.hifly.kafka.demo.avro.references.CarTelemetryData"
]
{
  "type": "record",
  "name": "CarTelemetryData",
  "namespace": "org.hifly.kafka.demo.avro.references",
  "fields": [
    {
      "name": "speed",
      "type": "double"
    },
    {
      "name": "latitude",
      "type": "string"
    },
    {
      "name": "longitude",
      "type": "string"
    }
  ]
}
{
  "type": "record",
  "name": "CarInfo",
  "namespace": "org.hifly.kafka.demo.avro.references",
  "fields": [
    {
      "name": "model",
      "type": "string"
    },
    {
      "name": "brand",
      "type": "string"
    }
  ]
}

Confluent Schema Registry is needed to run the example.

scripts/bootstrap-cflt-schema-registry.sh

Register the subjects using Confluent Schema Registry maven plugin:

cd confluent-avro-multi-event
mvn schema-registry:register

[INFO] --- kafka-schema-registry-maven-plugin:7.4.0:register (default-cli) @ confluent-avro-references ---
[INFO] Registered subject(car-info) with id 1 version 1
[INFO] Registered subject(car-telemetry-data) with id 2 version 1
[INFO] Registered subject(car-telemetry-value) with id 3 version 1

Verify the subjects:

curl -X GET http://localhost:8081/subjects

["car-info","car-telemetry-data","car-telemetry-value"]

Verify the resulting schema for car-telemetry-value subject:

curl -X GET http://localhost:8081/subjects/car-telemetry-value/versions/1

{"subject":"car-telemetry-value","version":1,"id":3,"references":[{"name":"io.confluent.examples.avro.references.CarInfo","subject":"car-info","version":1},{"name":"io.confluent.examples.avro.references.CarTelemetryData","subject":"car-telemetry-data","version":1}],"schema":"[\"org.hifly.kafka.demo.avro.references.CarInfo\",\"org.hifly.kafka.demo.avro.references.CarTelemetryData\"]"}

Generate Java Pojo from avro schemas:

cd confluent-avro-multi-event
mvn clean package

Run a Consumer:

cd confluent-avro-multi-event
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.avro.references.RunnerConsumer"

On a different shell, run a Producer:

cd confluent-avro-multi-event
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.avro.references.RunnerProducer"

Verify records on Consumer:

Car Info event {"model": "Ferrari", "brand": "F40"} - offset-> 4
Car Telemetry event {"speed": 156.8, "latitude": "42.8", "longitude": "22.6"} - offset-> 5

Teardown:

scripts/tear-down-cflt-schema-registry.sh

Confluent Schema Registry: Nested objects

This example shows how to use Avro nested objects.

In this example a topic named car-telemetry will be configured with a schema car-telemetry-data.avsc with a nested schema reference from car.avsc

{
  "type": "record",
  "name": "CarTelemetryData",
  "namespace": "org.hifly.kafka.demo.avro.references",
  "fields": [
    {
      "name": "speed",
      "type": "double"
    },
    {
      "name": "latitude",
      "type": "string"
    },
    {
      "name": "longitude",
      "type": "string"
    },
    {
      "name": "info",
      "type": "org.hifly.kafka.demo.avro.references.CarInfo"
    }

  ]
}
{
  "type": "record",
  "name": "CarInfo",
  "namespace": "org.hifly.kafka.demo.avro.references",
  "fields": [
    {
      "name": "model",
      "type": "string"
    },
    {
      "name": "brand",
      "type": "string"
    }
  ]
}

Confluent Schema Registry is needed to run the example.

scripts/bootstrap-cflt-schema-registry.sh

Register the subjects using Confluent Schema Registry maven plugin:

cd confluent-avro-hierarchy-event
mvn schema-registry:register

[INFO] --- kafka-schema-registry-maven-plugin:7.4.0:register (default-cli) @ confluent-avro-hierarchy-event ---
[INFO] Registered subject(car-info) with id 4 version 2
[INFO] Registered subject(car-telemetry-value) with id 5 version 3

Generate Java Pojo from avro schemas:

cd confluent-avro-hierarchy-event
mvn clean package

Run a Consumer:

cd confluent-avro-hierarchy-event
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.avro.references.app.RunnerConsumer"

On a different shell, run a Producer:

cd confluent-avro-hierarchy-event
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.avro.references.app.RunnerProducer"

Verify records on Consumer:

Record:{"speed": 156.8, "latitude": "42.8", "longitude": "22.6", "info": {"model": "Ferrari", "brand": "F40"}}

Teardown:

scripts/tear-down-cflt-schema-registry.sh

Apache Kafka® Connect

Unix commands Source Connector

Implementation of a sample Kafka Connect Source Connector; it executes unix commands (e.g. fortune, ls -ltr, netstat) and sends its output to a topic.

Important
unix commands are executed on connect worker node.

This connector relies on Confluent Schema Registry to convert messages using an Avro converter: io.confluent.connect.avro.AvroConverter.

{
    "name" : "unixcommandsource",
    "config": {
        "connector.class" : "org.hifly.kafka.demo.connector.UnixCommandSourceConnector",
        "command" : "fortune",
        "topic": "unixcommands",
        "poll.ms" : 5000,
        "tasks.max": 1
    }
}

Parameters for source connector:

  • command – unix command to execute (e.g. ls -ltr, fortune)

  • topic – output topic

  • poll.ms – poll interval in milliseconds between every execution

Create the connector package:

cd kafka-unixcommand-connector
mvn clean package

Run the Docker container:

scripts/bootstrap-unixcommand-connector.sh

This will create an image based on confluentinc/cp-kafka-connect-base using a custom Dockerfile.

It will use the confluent-hub utility confluent-hub install to install the plugin in connect.

Deploy the connector:

curl -X POST -H Accept:application/json -H Content-Type:application/json http://localhost:8083/connectors/ -d @kafka-unixcommand-connector/config/source.quickstart.json

Teardown:

scripts/tear-down-unixcommand-connector.sh

Custom SMT: composite key from json records.

Implementation of a custom Single Message Transformation (SMT); it creates a key from a list of json fields from message record value. Fields are configurable using SMT property fields.

Example:

Original record:

key: null
value: {"FIELD1": "01","FIELD2": "20400","FIELD3": "001","FIELD4": "0006084655017","FIELD5": "20221117","FIELD6": 9000018}

Result after SMT:

"transforms.createKey.fields": "FIELD1,FIELD2,FIELD3"

key: 0120400001
value: {"FIELD1": "01","FIELD2": "20400","FIELD3": "001","FIELD4": "0006084655017","FIELD5": "20221117","FIELD6": 9000018}

The example applies the SMT to a MongoDB sink connector.

Run the example:

scripts/bootstrap-smt-connector.sh

A MongoDB sink connector will be created with this config:

{
  "name": "mongo-sink",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "topics": "test",
    "connection.uri": "mongodb://admin:password@mongo:27017",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false,
    "database": "Tutorial2",
    "collection": "pets",
    "transforms": "createKey",
    "transforms.createKey.type": "org.hifly.kafka.smt.KeyFromFields",
    "transforms.createKey.fields": "FIELD1,FIELD2,FIELD3"
  }
}

Original json messages will be sent to test topic.

Sink connector will apply the SMT and store the records in MongoDB pets collection from Tutorial2 database, using a key generated by the SMT.

Teardown:

scripts/tear-down-smt-connector.sh

SMT: log records with AOP

Usage of a predefined SMT to a MongoDB sink connector.

apply method for SMT classes in package org.apache.kafka.connect.transforms is intercepted by a Java AOP Aspect implemented using AspectJ framework.

The @Aspect, implemented in class org.hifly.kafka.smt.aspectj.SMTAspect, logs the input arg (SinkRecord object) to the standard output.

 @Pointcut("execution(* org.apache.kafka.connect.transforms.*.apply(..)) && !execution(* org.apache.kafka.connect.runtime.PredicatedTransformation.apply(..))")
    public void standardMethod() {}

    @Before("standardMethod()")
    public void log(JoinPoint jp) throws Throwable {

        Object[] array = jp.getArgs();
        if(array != null) {
            for(Object tmp: array)
                LOGGER.info(tmp.toString());
        }
    }

Connect log will show sink records entries:

SinkRecord{kafkaOffset=0, timestampType=CreateTime} ConnectRecord{topic='test', kafkaPartition=2, key=null, keySchema=Schema{STRING}, value={"FIELD1": "01","FIELD2": "20400","FIELD3": "001","FIELD4": "0006084655017","FIELD5": "20221117","FIELD6": 9000018}, valueSchema=Schema{STRING}, timestamp=1683701851358, headers=ConnectHeaders(headers=)}

Run the example:

scripts/bootstrap-smt-aspectj.sh

Connect will start with aspectjweaver java agent:

-Dorg.aspectj.weaver.showWeaveInfo=true -Daj.weaving.verbose=true -javaagent:/usr/share/java/aspectjweaver-1.9.19.jar

Aspects are deployed as standard jars and copied to Kafka Connect classpath /etc/kafka-connect/jars/kafka-smt-aspectj-1.2.1.jar

A MongoDB sink connector will be created with this config:

{
  "name": "mongo-sink",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "topics": "test",
    "connection.uri": "mongodb://admin:password@mongo:27017",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": false,
    "value.converter.schemas.enable": false,
    "database": "Tutorial2",
    "collection": "pets",
    "transforms": "Filter",
    "transforms.Filter.type": "org.apache.kafka.connect.transforms.Filter",
    "transforms.Filter.predicate": "IsFoo",
    "predicates": "IsFoo",
    "predicates.IsFoo.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
    "predicates.IsFoo.pattern": "test"

  }
}

Original json messages will be sent to test topic.

Sink connector will apply the SMT and store the records in MongoDB pets collection from Tutorial2 database.

Teardown:

scripts/tear-down-smt-aspectj.sh

Postgres to Mongo

In this example a JDBC source connector will copy rows from a Postgres table to a MongoDB collection. Rows containing a JSON CLOB not properly parsable will be sent to DLQ topic.

MongoDB sink connector example configured to send bad messages to a DLQ topic named dlq-mongo-accounts.

MongoDB Sink Connector has been configured to use a id strategy to determine the _id value for each document.

MongoDB Sink Connector has been configured to use a delete strategy when it receives a tombstone event.

Run the example:

scripts/bootstrap-postgres-to-mongo.sh

A JDBC source connector will be created with this config:

{
  "name": "jdbc-source-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:postgresql://postgres:5432/postgres",
    "connection.user": "postgres",
    "connection.password": "postgres",
    "table.whitelist": "accounts",
    "mode": "incrementing",
    "incrementing.column.name": "seq_id",
    "topic.prefix": "jdbc_",
    "poll.interval.ms": "5000",
    "numeric.mapping": "best_fit",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "transforms": "createKey,nestKey",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields": "id",
    "transforms.nestKey.type": "org.apache.kafka.connect.transforms.ReplaceField$Key",
    "transforms.nestKey.renames": "id:originalId"
  }
}

A MongoDB sink connector will be created with this config:

{
  "name": "mongo-sink-dlq",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
    "errors.tolerance": "all",
    "topics": "jdbc_accounts",
    "errors.deadletterqueue.topic.name": "dlq-mongo-accounts",
    "errors.deadletterqueue.topic.replication.factor": "1",
    "errors.deadletterqueue.context.headers.enable": "true",
    "connection.uri": "mongodb://admin:password@mongo:27017",
    "database": "Employee",
    "collection": "account",
    "mongo.errors.log.enable":"true",
    "delete.on.null.values": "true",
    "document.id.strategy.overwrite.existing": "true",
    "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.FullKeyStrategy",
    "delete.writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.DeleteOneDefaultStrategy",
    "publish.full.document.only": "true",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

Validate results, query documents in Mongo collection account from database Employee:

docker exec -it mongo mongosh "mongodb://admin:password@localhost:27017" --eval 'db.getSiblingDB("Employee").account.find()'
[
  { _id: { originalId: '1' }, id: '1', ssn: 'AAAA' },
  { _id: { originalId: '2' }, id: '2', ssn: 'BBBB' },
  { _id: { originalId: '3' }, id: '3', ssn: 'CCCC' },
  { _id: { originalId: '4' }, id: '4', ssn: 'DDDD' },
  { _id: { originalId: '5' }, id: '5', ssn: 'EEEE' }
]

Teardown:

scripts/tear-down-postgres-to-mongo.sh

HTTP Sink Connector example

Example of usage of HTTP Sink Connector.

Run the example:

scripts/bootstrap-connect-sink-http.sh

A web application, exposing REST APIs, listening on port 8010 will start up.

A HTTP sink connector will be created with this config:

{
  "name": "SimpleHttpSink",
  "config":
  {
    "topics": "topicA",
    "tasks.max": "2",
    "connector.class": "io.confluent.connect.http.HttpSinkConnector",
    "http.api.url": "http://host.docker.internal:8010/api/message",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "confluent.topic.bootstrap.servers": "broker:9092",
    "confluent.topic.replication.factor": "1",
    "reporter.bootstrap.servers": "broker:9092",
    "reporter.result.topic.name": "success-responses",
    "reporter.result.topic.replication.factor": "1",
    "reporter.error.topic.name": "error-responses",
    "reporter.error.topic.replication.factor": "1",
    "consumer.override.max.poll.interval.ms": "5000"
  }
}

Send json messages to topicA topic:

docker exec -it broker kafka-console-producer --broker-list broker:9092 --topic topicA --property "parse.key=true" --property "key.separator=:"
> 1:{"FIELD1": "01","FIELD2": "20400","FIELD3": "001","FIELD4": "0006084655017","FIELD5": "20221117","FIELD6": 9000018}

Sink connector will execute an HTTP POST Request to the endpoint http://localhost:8010/api/message

Teardown:

scripts/tear-down-connect-sink-http.sh

S3 Sink Connector example

Example of usage of S3 Sink Connector.

Run the example:

scripts/bootstrap-connect-sink-s3.sh

MinIO will start listening on port 9000 (admin/minioadmin)

A S3 sink connector will be created with this config:

{
  "name": "sink-s3",
  "config":
  {
    "topics": "gaming-player-activity",
    "tasks.max": "1",
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "store.url": "http://minio:9000",
    "s3.region": "us-west-2",
    "s3.bucket.name": "gaming-player-activity-bucket",
    "s3.part.size": "5242880",
    "flush.size": "100",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "schema.compatibility": "NONE"
  }
}

Sink connector will read messages from topic gaming-player-activity and store them in a S3 bucket gaming-player-activity-bucket using io.confluent.connect.s3.format.avro.AvroFormat as format class.

Sink connector will generate a new object storage entry every 100 messages (flush_size).

To generate random records for topic gaming-player-activity we will use jr tool.

Send 1000 messages to gaming-player-activity topic using jr:

docker exec -it -w /home/jr/.jr jr jr template run gaming_player_activity -n 1000 -o kafka -t gaming-player-activity -s --serializer avro-generic

Verify that 10 entries are stored in MinIO into gaming-player-activity-bucket bucket, connecting to MiniIO web console, http://localhost:9000 (admin/minioadmin):

gaming-player-activity-bucket

Teardown:

scripts/tear-down-connect-sink-s3.sh

Parquet format

Same example but Sink connector will read Avro messages from topic gaming-player-activity and store them in a S3 bucket gaming-player-activity-bucket using io.confluent.connect.s3.format.parquet.ParquetFormat as format class.

The format of data stored in MinIO will be Parquet.

Run the example:

scripts/bootstrap-connect-sink-s3-parquet.sh

A S3 sink connector will be created with this config:

{
  "name": "sink-parquet-s3",
  "config":
  {
    "topics": "gaming-player-activity",
    "tasks.max": "1",
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "store.url": "http://minio:9000",
    "s3.region": "us-west-2",
    "s3.bucket.name": "gaming-player-activity-bucket",
    "s3.part.size": "5242880",
    "flush.size": "100",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "parquet.codec": "snappy",
    "schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

Send 1000 messages to gaming-player-activity topic using jr:

docker exec -it -w /home/jr/.jr jr jr template run gaming_player_activity -n 1000 -o kafka -t gaming-player-activity -s --serializer avro-generic

Verify that 10 entries are stored in MinIO into gaming-player-activity-bucket bucket, connecting to MiniIO web console, http://localhost:9000 (admin/minioadmin):

gaming-player-activity-bucket

Teardown:

scripts/tear-down-connect-sink-s3.sh

SAP HANA Source Connector example

Example of usage of SAP HANA Source Connector.

Run the example:

scripts/bootstrap-connect-source-sap-hana.sh

Insert rows in LOCALDEV.TEST table:

docker exec -i hana /usr/sap/HXE/HDB90/exe/hdbsql -i 90 -d HXE -u LOCALDEV -p Localdev1  > /tmp/result.log  2>&1 <<-EOF
INSERT INTO TEST (111, 'foo', 100,50);
INSERT INTO TEST (222, 'bar', 100,50);
EOF

A SAP HANA source connector will be created with this config:

{
  "name": "sap-hana-source",
  "config":
  {
    "topics": "testtopic",
    "tasks.max": "1",
    "connector.class": "com.sap.kafka.connect.source.hana.HANASourceConnector",
    "connection.url": "jdbc:sap://sap:39041/?databaseName=HXE&reconnect=true&statementCacheSize=512",
    "connection.user": "LOCALDEV",
    "connection.password" : "Localdev1",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "auto.create": "true",
    "testtopic.table.name": "\"LOCALDEV\".\"TEST\"",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

Source will read rows from LOCALDEV.TEST table and store in testtopic topic.

Teardown:

scripts/tear-down-connect-source-sap-hana.sh

Outbox Table: Event Router with SMT and JDBC Source Connector

In this example, some SMT transformations (chained) are used to create an Event Router starting from an input outbox table.

The outbox table contains different operations for the same aggregate (Consumer Loan); the different operations are sent on specific topics following these routing rules:

  • operation: CREATE -→ topic: loan

  • operation: INSTALLMENT_PAYMENT -→ topic: loan_payment

  • operation: EARLY_LOAN_CLOSURE -→ topic: loan

Records from the outbox table are fetched using a JDBC Source Connector.

Run the example:

scripts/bootstrap-connect-event-router.sh

Outbox table:

Outbox table
insert into outbox_table (id, aggregate, operation, payload, event_time) values (1, 'Consumer Loan', 'CREATE', '{\"event\": {\"type\":\"Mortgage Opening\",\"timestamp\":\"2023-11-20T10:00:00\",\"data\":{\"mortgageId\":\"ABC123\",\"customer\":\"John Doe\",\"amount\":200000,\"duration\": 20}}}','2023-11-20 10:00:00');

insert into outbox_table (id, aggregate, operation, payload, event_time) values (2, 'Consumer Loan', 'INSTALLMENT_PAYMENT', '{\"event\": {\"type\":\"Mortgage Opening\",\"timestamp\":\"2023-11-20T10:00:00\",\"data\":{\"mortgageId\":\"ABC123\",\"customer\":\"John Doe\",\"amount\":200000,\"duration\": 20}}}','2023-12-01 09:30:00');

insert into outbox_table (id, aggregate, operation, payload, event_time) values (3, 'Consumer Loan', 'EARLY_LOAN_CLOSURE', '{\"event\":{\"type\":\"Early Loan Closure\",\"timestamp\":\"2023-11-25T14:15:00\",\"data\":{\"mortgageId\":\"ABC12\",\"closureAmount\":150000,\"closureDate\":\"2023-11-25\",\"paymentMethod\":\"Bank Transfer\",\"transactionNumber\":\"PQR456\"}}}','2023-11-25 09:30:00');

A JDBC Source Connector will be created with this config:

{
  "name" : "pgsql-sample-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://postgres:5432/postgres",
    "connection.user": "postgres",
    "connection.password": "postgres",
    "topic.prefix": "",
    "poll.interval.ms" : 3600000,
    "table.whitelist" : "public.outbox_table",
    "mode":"bulk",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "transforms":"valueToTopic,addPrefix,removeString1,removeString2",
    "transforms.valueToTopic.type":"io.confluent.connect.transforms.ExtractTopic$Value",
    "transforms.valueToTopic.field":"operation",
    "transforms.addPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.addPrefix.regex": ".*",
    "transforms.addPrefix.replacement": "loan$0",
    "transforms.removeString1.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.removeString1.regex": "(.*)CREATE(.*)",
    "transforms.removeString1.replacement": "$1$2",
    "transforms.removeString2.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.removeString2.regex": "(.*)INSTALLMENT(.*)",
    "transforms.removeString2.replacement": "$1$2",
    "topic.creation.default.replication.factor": 1,
    "topic.creation.default.partitions": 1
  }
}

Verify topic list:

docker exec -it broker kafka-topics --bootstrap-server broker:9092 --list

__consumer_offsets
_schemas
docker-connect-configs
docker-connect-offsets
docker-connect-status
loan
loan_PAYMENT
docker exec -it broker /bin/bash
[appuser@broker ~]$ cat /tmp/kraft-combined-logs/loan-0/00000000000000000000.log
�����Wz���Wz�����������������Consumer Loan
CREATE�{\"event\": {\"type\":\"Mortgage Opening\",\"timestamp\":\"2023-11-20T10:00:00\",\"data\":{\"mortgageId\":\"ABC123\",\"customer\":\"John Doe\",\"amount\":200000,\"duration\": 20}}}&2023-11-20 10:00:00k'�z<��Wz���Wz�����������������Consumer Loan$EARLY_LOAN_CLOSURE�{\"event\":{\"type\":\"Early Loan Closure\",\"timestamp\":\"2023-11-25T14:15:00\",\"data\":{\"mortgageId\":\"ABC12\",\"closureAmount\":150000,\"closureDate\":\"2023-11-25\",\"paymentMethod\":\"Bank Transfer\",\"transactionNumber\":\"PQR456\"}}}&2023-11-25 09:30:00
docker exec -it broker /bin/bash
[appuser@broker ~]$ cat /tmp/kraft-combined-logs/loan_PAYMENT-0/00000000000000000000.log
,��A��Wz���Wz�����������������Consumer Loan&INSTALLMENT_PAYMENT�{\"event\": {\"type\":\"Mortgage Opening\",\"timestamp\":\"2023-11-20T10:00:00\",\"data\":{\"mortgageId\":\"ABC123\",\"customer\":\"John Doe\",\"amount\":200000,\"duration\": 20}}}&2023-12-01 09:30:00

Teardown:

scripts/tear-down-connect-event-router.sh

CDC with Debezium PostgreSQL Source Connector

Usage of Debezium Source Connector for PostgreSQL to send RDMS table updates into a topic.

The debezium/debezium-connector-postgresql:1.7.1 connector has been installed into connect docker image using confluent hub (see docker-compose.yml file).

Run cluster:

scripts/bootstrap-cdc.sh

The connector uses pgoutput plugin for replication. This plug-in is always present in PostgreSQL server. The Debezium connector interprets the raw replication event stream directly into change events.

Verify the existence of account table and data in PostgreSQL:

docker exec -it postgres psql -h localhost -p 5432 -U postgres -c 'select * from accounts;'
 user_id | username | password |    email     |         created_on         |         last_login
---------+----------+----------+--------------+----------------------------+----------------------------
       1 | foo      | bar      | foo@bar.com  | 2023-10-16 10:48:08.595034 | 2023-10-16 10:48:08.595034
       2 | foo2     | bar2     | foo2@bar.com | 2023-10-16 10:48:08.596646 | 2023-10-16 10:48:08.596646
       3 | foo3     | bar3     | foo3@bar.com | 2023-10-16 10:51:22.671384 | 2023-10-16 10:51:22.671384
       4 | foo4     | bar4     | foo4@bar.com | 2024-02-28 12:12:08.665137 | 2024-02-28 12:12:08.665137

Deploy the connector:

curl -v -X POST -H 'Content-Type: application/json' -d @cdc-debezium-postgres/config/debezium-source-pgsql.json http://localhost:8083/connectors

Run a consumer on postgres.public.accounts topic and see the records:

docker exec -it broker kafka-console-consumer --topic postgres.public.accounts --bootstrap-server broker:9092 --from-beginning --property print.key=true --property print.value=false

Insert a new record into account table:

docker exec -it postgres psql -h localhost -p 5432 -U postgres -c "insert into accounts (user_id, username, password, email, created_on, last_login) values (3, 'foo3', 'bar3', 'foo3@bar.com', current_timestamp, current_timestamp);"

Verify in consumer log the existence of 3 records:

Struct{user_id=1}
Struct{user_id=2}
Struct{user_id=3}

Teardown:

scripts/tear-down-cdc.sh

CDC with Debezium Informix Source Connector

Usage of Debezium Source Connector for Informix to send RDMS table updates into a topic.

Run environment:

scripts/bootstrap-cdc-informix.sh

Perform the following tasks to prepare for using the Change Data Capture API and create tables on iot database:

docker exec -it ifx /bin/bash
export DBDATE=Y4MD
dbaccess iot /opt/ibm/informix/etc/syscdcv1.sql
dbaccess iot /tmp/informix_ddl_sample.sql
exit

Deploy the connector:

curl -v -X POST -H 'Content-Type: application/json' -d @cdc-debezium-informix/config/debezium-source-informix.json http://localhost:8083/connectors

Run a consumer on test.informix.cust_db topic and see the records (expect to see 6 records):

kafka-avro-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic test.informix.cust_db --property schema.registry.url=http://localhost:8081
{"before":null,"after":{"test.informix.cust_db.Value":{"c_key":"\u0004W","c_status":{"string":"Z"},"c_date":{"int":19100}}},"source":{"version":"2.6.1.Final","connector":"informix","name":"test","ts_ms":1713272938000,"snapshot":{"string":"first"},"db":"iot","sequence":null,"ts_us":1713272938000000,"ts_ns":1713272938000000000,"schema":"informix","table":"cust_db","commit_lsn":{"string":"21484679168"},"change_lsn":null,"txId":null,"begin_lsn":null},"op":"r","ts_ms":{"long":1713272939104},"ts_us":{"long":1713272939104761},"ts_ns":{"long":1713272939104761000},"transaction":null}

{"before":null,"after":{"test.informix.cust_db.Value":{"c_key":"\b®","c_status":{"string":"Z"},"c_date":{"int":18735}}},"source":{"version":"2.6.1.Final","connector":"informix","name":"test","ts_ms":1713272938000,"snapshot":{"string":"true"},"db":"iot","sequence":null,"ts_us":1713272938000000,"ts_ns":1713272938000000000,"schema":"informix","table":"cust_db","commit_lsn":{"string":"21484679168"},"change_lsn":null,"txId":null,"begin_lsn":null},"op":"r","ts_ms":{"long":1713272939105},"ts_us":{"long":1713272939105769},"ts_ns":{"long":1713272939105769000},"transaction":null}

{"before":null,"after":{"test.informix.cust_db.Value":{"c_key":"\r\u0005","c_status":{"string":"Z"},"c_date":{"int":18370}}},"source":{"version":"2.6.1.Final","connector":"informix","name":"test","ts_ms":1713272938000,"snapshot":{"string":"true"},"db":"iot","sequence":null,"ts_us":1713272938000000,"ts_ns":1713272938000000000,"schema":"informix","table":"cust_db","commit_lsn":{"string":"21484679168"},"change_lsn":null,"txId":null,"begin_lsn":null},"op":"r","ts_ms":{"long":1713272939105},"ts_us":{"long":1713272939105848},"ts_ns":{"long":1713272939105848000},"transaction":null}

{"before":null,"after":{"test.informix.cust_db.Value":{"c_key":"\u0011\\","c_status":{"string":"Z"},"c_date":{"int":18004}}},"source":{"version":"2.6.1.Final","connector":"informix","name":"test","ts_ms":1713272938000,"snapshot":{"string":"true"},"db":"iot","sequence":null,"ts_us":1713272938000000,"ts_ns":1713272938000000000,"schema":"informix","table":"cust_db","commit_lsn":{"string":"21484679168"},"change_lsn":null,"txId":null,"begin_lsn":null},"op":"r","ts_ms":{"long":1713272939105},"ts_us":{"long":1713272939105931},"ts_ns":{"long":1713272939105931000},"transaction":null}

{"before":null,"after":{"test.informix.cust_db.Value":{"c_key":"\u0015³","c_status":{"string":"Z"},"c_date":{"int":17639}}},"source":{"version":"2.6.1.Final","connector":"informix","name":"test","ts_ms":1713272938000,"snapshot":{"string":"true"},"db":"iot","sequence":null,"ts_us":1713272938000000,"ts_ns":1713272938000000000,"schema":"informix","table":"cust_db","commit_lsn":{"string":"21484679168"},"change_lsn":null,"txId":null,"begin_lsn":null},"op":"r","ts_ms":{"long":1713272939105},"ts_us":{"long":1713272939105984},"ts_ns":{"long":1713272939105984000},"transaction":null}

{"before":null,"after":{"test.informix.cust_db.Value":{"c_key":"\u001A\n","c_status":{"string":"Z"},"c_date":{"int":17274}}},"source":{"version":"2.6.1.Final","connector":"informix","name":"test","ts_ms":1713272938000,"snapshot":{"string":"last"},"db":"iot","sequence":null,"ts_us":1713272938000000,"ts_ns":1713272938000000000,"schema":"informix","table":"cust_db","commit_lsn":{"string":"21484679168"},"change_lsn":null,"txId":null,"begin_lsn":null},"op":"r","ts_ms":{"long":1713272939106},"ts_us":{"long":1713272939106252},"ts_ns":{"long":1713272939106252000},"transaction":null}

Teardown:

scripts/tear-down-cdc-informix.sh

CDC with Debezium MongoDB Source Connector and Outbox Event Router

Usage of Debezium Source Connector for MongoDB to send updates into a topic. This example will use Debezium Event Router to implement a scenario for the Outbox pattern.

Run environment:

scripts/bootstrap-cdc-mongo.sh

Script will create a user data-platform-cdc with the privileges required to run CDC:

use admin
db.createRole({
   role: "CDCRole",
   privileges: [
      { resource: { cluster: true }, actions: ["find", "changeStream"] },
      { resource: { db: "outbox", collection: "loans" }, actions: [ "find", "changeStream" ] }
   ],
   roles: []
});

db.createUser({
  user: "data-platform-cdc",
  pwd: "password",
  roles: [
    { role: "read", db: "admin" },
    { role: "clusterMonitor", db: "admin" },
    { role: "read", db: "config" },
    { role: "read", db: "outbox" },
    { role: "CDCRole", db: "admin"}
  ]
});

Script will also insert a document into outbox database and loans collection:

{
  "aggregateId": "012313",
  "aggregateType": "Consumer Loan",
  "topicName": "CONSUMER_LOAN",
  "eventDate": "2024-08-20T09:42:02.665Z",
  "eventId": 1,
  "eventType": "INSTALLMENT_PAYMENT",
  "payload": {
    "amount": "200000"
  }
}

Script will deploy a source connector. Topic destination will be extracted from field topicName in document. Message Key will be set using filed aggregateId:

{
  "name": "mongo-debezium-connector",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.connection.string": "mongodb://mongo:27017/?replicaSet=rs0",
    "topic.prefix": "test",
    "database.include.list" : "outbox",
    "collection.include.list" : "outbox.loans",
    "mongodb.user" : "data-platform-cdc",
    "mongodb.password" : "password",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    "transforms": "outbox,unwrap",
    "transforms.outbox.type": "io.debezium.connector.mongodb.transforms.outbox.MongoEventRouter",
    "transforms.outbox.tracing.span.context.field": "propagation",
    "transforms.outbox.tracing.with.context.field.only": "false",
    "transforms.outbox.tracing.operation.name": "debezium-read",
    "transforms.outbox.collection.field.event.key": "aggregateId",
    "transforms.outbox.collection.field.event.id": "aggregateId",
    "transforms.outbox.collection.field.event.payload": "payload",
    "transforms.outbox.collection.expand.json.payload": "true",
    "transforms.outbox.collection.fields.additional.placement": "aggregateType:header:aggregateType,eventDate:header:eventTime,eventType:header:type,eventId:header:id",
    "transforms.outbox.route.by.field": "topicName",
    "transforms.outbox.route.topic.replacement": "${routedByValue}",
    "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
    "transforms.unwrap.drop.tombstones": "false",
    "transforms.unwrap.operation.header": "false",
    "transforms.unwrap.delete.handling.mode": "drop",
    "transforms.unwrap.array.encoding": "array"
  }
}

Run a consumer on CONSUMER_LOAN topic and see the records (headers - key - value):

docker exec -it broker kafka-console-consumer --bootstrap-server broker:9092 --topic CONSUMER_LOAN --from-beginning --property print.key=true --property print.headers=true
id:012313,aggregateType:Consumer Loan,eventTime:2024-08-20T09:42:02.665Z,type:INSTALLMENT_PAYMENT,id:1	012313	{"amount":"200000"}

Teardown:

scripts/tear-down-cdc-mongo.sh

Tasks distributions using a Datagen Source Connector

This example will show how tasks are automatically balanced between Running worker nodes.

A connect cluster will be created with 2 workers, connect and connect2 and using a Datagen Source Connector with 4 tasks continuously inserting data.

After some seconds connect2 will be stopped and all tasks will be redistributed to connect worker node.

Run sample:

scripts/bootstrap-connect-tasks.sh

You will first see tasks distributed between the 2 Running workers:

{"datagen-sample":{"status":{"name":"datagen-sample","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"},{"id":1,"state":"RUNNING","worker_id":"connect2:8083"},{"id":2,"state":"RUNNING","worker_id":"connect:8083"},{"id":3,"state":"RUNNING","worker_id":"connect2:8083"}],"type":"source"}}}

After stopping connect2, you will see tasks only distributed to connect worker:

{"datagen-sample":{"status":{"name":"datagen-sample","connector":{"state":"RUNNING","worker_id":"connect:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"connect:8083"},{"id":1,"state":"RUNNING","worker_id":"connect:8083"},{"id":2,"state":"RUNNING","worker_id":"connect:8083"},{"id":3,"state":"RUNNING","worker_id":"connect:8083"}],"type":"source"}}}

Teardown:

scripts/tear-down-connect-tasks.sh

Apache Kafka® Streams

Folder: kafka-streams

Implementation of a series of Apache Kafka® Streams topologies.

Execute tests:

cd kafka-streams
mvn clean test

Events counter Stream

Count number of events grouped by key.

Create topics:

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic counter-input-topic --replication-factor 1 --partitions 2

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic counter-output-topic --replication-factor 1 --partitions 2

Run the topology:

cd kafka-streams
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.streams.stream.StreamCounter"

Send messages to counter-input-topic topics:

docker exec -it broker /opt/kafka/bin/kafka-console-producer.sh --broker-list broker:9092 --topic counter-input-topic --property "parse.key=true" --property "key.separator=:"
"John":"transaction_1"
"Mark":"transaction_1"
"John":"transaction_2"

Read from counter-output-topic topic:

docker exec -it broker /opt/kafka/bin/kafka-console-consumer.sh --topic counter-output-topic --bootstrap-server broker:9092 --from-beginning --property print.key=true --property key.separator=" : " --value-deserializer "org.apache.kafka.common.serialization.LongDeserializer"

Sum values Stream

Sum values grouping by key.

Create topics:

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic sum-input-topic --replication-factor 1 --partitions 2

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic sum-output-topic --replication-factor 1 --partitions 2

Run the topology:

cd kafka-streams
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.streams.stream.StreamSum"

Send messages to sum-input-topic topics:

docker exec -it broker /opt/kafka/bin/kafka-console-producer.sh --broker-list broker:9092 --topic sum-input-topic --property "parse.key=true" --property "key.separator=:"
"John":1
"Mark":2
"John":5

Read from sum-output-topic topic:

docker exec -it broker /opt/kafka/bin/kafka-console-consumer.sh --topic sum-output-topic --bootstrap-server broker:9092 --from-beginning --property print.key=true --property key.separator=" : " --value-deserializer "org.apache.kafka.common.serialization.IntegerDeserializer"

Cars sensor Stream

The stream filters out speed data from car data sensor records. Speed limit is set to 150km/h and only events exceeding the limits are filtered out.

A KTable stores the car info data.
A left join between the KStream and the KTable produces a new aggregated object published to an output topic.

Create carinfo-topic, carsensor-topic and carsensor-output-topic topics:

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic carinfo-topic --replication-factor 1 --partitions 2

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic carsensor-topic --replication-factor 1 --partitions 2

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic carsensor-output-topic --replication-factor 1 --partitions 2

Run the topology:

cd kafka-streams
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.streams.stream.CarSensorStream"

Send messages to carinfo-topic and carsensor-topic topics:

docker exec -it broker /opt/kafka/bin/kafka-console-producer.sh --broker-list broker:9092 --topic carinfo-topic --property "parse.key=true" --property "key.separator=:"
1:{"id":"1","brand":"Ferrari","model":"F40"}
docker exec -it broker /opt/kafka/bin/kafka-console-producer.sh --broker-list broker:9092 --topic carsensor-topic --property "parse.key=true" --property "key.separator=:"
1:{"id":"1","speed":350}

Read from carsensor-output-topic topic:

docker exec -it broker /opt/kafka/bin/kafka-console-consumer.sh --topic carsensor-output-topic --bootstrap-server broker:9092 --from-beginning --property print.key=true --property key.separator=" : "

Cars brand Stream

The stream splits the original data into 2 different topics, one for Ferrari cars and one for all other car brands.

Create cars-input-topic, ferrari-input-topic and cars-output-topic topics:

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic cars-input-topic --replication-factor 1 --partitions 2

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic ferrari-input-topic --replication-factor 1 --partitions 2

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic cars-output-topic --replication-factor 1 --partitions 2

Run the topology:

cd kafka-streams
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.streams.stream.CarBrandStream"

Send messages to cars-input-topic topic:

docker exec -it broker /opt/kafka/bin/kafka-console-producer.sh --broker-list broker:9092 --topic cars-input-topic --property "parse.key=true" --property "key.separator=:"
1:{"id":"1","brand":"Ferrari","model":"F40"}
2:{"id":"2","brand":"Bugatti","model":"Chiron"}

Read from ferrari-input-topic and cars-output-topic topics:

docker exec -it broker /opt/kafka/bin/kafka-console-consumer.sh --topic ferrari-input-topic --bootstrap-server broker:9092 --from-beginning --property print.key=true --property key.separator=" : "
docker exec -it broker /opt/kafka/bin/kafka-console-consumer.sh --topic cars-output-topic --bootstrap-server broker:9092 --from-beginning --property print.key=true --property key.separator=" : "

JSONArray Fields removal Processor

Remove a specific json field from the record and forward it to the next topology node. This example uses Kafka Streams Processor API.

Execute tests:

cd kafka-streams-processor
mvn clean test

Create processor-input-topic and processor-output-topic topics:

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic processor-input-topic --replication-factor 1 --partitions 2

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic processor-output-topic --replication-factor 1 --partitions 2

Run the topology:

cd kafka-streams
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.streams.processor.JSONArrayRemoveProcessorApplication"

Send messages to processor-input-topic topics:

docker exec -it broker /opt/kafka/bin/kafka-console-producer.sh --broker-list broker:9092 --topic processor-input-topic --property "parse.key=true" --property "key.separator=:"
1:{"id":"1","brand":"Ferrari","model":"F40"}

Read from processor-output-topic topic:

docker exec -it broker /opt/kafka/bin/kafka-console-consumer.sh --topic processor-output-topic --bootstrap-server broker:9092 --from-beginning --property print.key=true --property key.separator=" : "

Expired Messages Processor

Remove old entries based on time (expiration time set to 30 seconds) using a punctuator. This example uses Kafka Streams Processor API.

Execute tests:

cd kafka-streams-processor
mvn clean test

Create expired-messages-input-topic and expired-messages-output-topic topics:

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic expired-messages-input-topic --replication-factor 1 --partitions 2

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic expired-messages-output-topic --replication-factor 1 --partitions 2

Run the topology:

cd kafka-streams-processor
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.streams.processor.ExpiredMessagesApplication"

Send messages to expired-messages-input-topic topics:

docker exec -it broker /opt/kafka/bin/kafka-console-producer.sh --broker-list broker:9092 --topic expired-messages-input-topic --property "parse.key=true" --property "key.separator=:"
1:{"id":"1","remote-device":"R01","time":"2021-11-02T02:50:12.208Z"}

Read from expired-messages-output-topic topic:

docker exec -it broker /opt/kafka/bin/kafka-console-consumer.sh --topic expired-messages-output-topic --bootstrap-server broker:9092 --from-beginning --property print.key=true --property key.separator=" : "

Find out in kafka streams application log, expiration entries:

[expired-messages-app-073d6f11-585b-4e69-b91f-bc998bdf49f3-StreamThread-1] INFO org.hifly.kafka.demo.streams.processor.ExpiredMessagesProcessor - 1 is expired --> Sessions between:99849

Interactive queries

Folder: kafka-streams

Class org.hifly.kafka.demo.streams.queries.QueryController shows how to execute queries against the local state store. Controller is attached at the following streams example:

When running the StreamCounter example, check in the terminal for entries like:

query result <key>: <value>

Kafka Streams application deployed on Kubernetes

In this example a stateful Kafka Stream Application, using RocksDB will be deployed on a Kubernetes cluster using a StatefulSet.

The application uses a window store of 5 minutes duration to count the number of words.

In order to run the example you need to provision first a Kafka cluster on Kubernetes. You can follow the example in this repo in section "Installation on Kubernetes using Confluent For Kubernetes" for running a cluster on Minikube.

The application in this demo requires a kafka broker bound on address: "kafka:9071".

After installed the kafka cluster on k8s, you need first to create a Docker Image for the demo application and register in your Kubernetes Docker Registry. The next commands show how to do it with Minikube and docker driver.

cd kafka-streams-k8s
mvn clean package
# Valid if you are suing minikube with docker driver
eval $(minikube docker-env)
docker build -t kafka-streams-app .

Verify that the image is listed in your Kubernetes Docker Registry:

$ docker images
REPOSITORY                                TAG         IMAGE ID       CREATED         SIZE
kafka-streams-app                         latest      9facf1537335   2 seconds ago   322MB
confluentinc/confluent-operator           0.1033.33   1223f152dab7   6 weeks ago     146MB

Create the topics required for the demo:

kubectl exec --stdin --tty kafka-0 -- /bin/bash

kafka-topics --bootstrap-server localhost:9092 --create --topic words-input-topic --partitions 6
kafka-topics --bootstrap-server localhost:9092 --create --topic words-counter-output-topic --partitions 6
exit

Now you can deploy the kafka stream application on Kubernetes suing a StatefulSet and a Headless Service:

kubectl apply -f k8s/statefulset.yml
kubectl apply -f k8s/svc.yml
kubectl apply -f k8s/svc-promethues.yml

Verify that 3 instances of the application are running:

$ kubectl get pods
NAME                                 READY   STATUS    RESTARTS   AGE
confluent-operator-76c899cf8-pl6ng   1/1     Running   0          3m6s
kafka-0                              1/1     Running   0          96s
kafka-1                              1/1     Running   0          96s
kafka-2                              1/1     Running   0          96s
kafka-streams-app-0                  1/1     Running   0          43s
kafka-streams-app-1                  1/1     Running   0          40s
kafka-streams-app-2                  1/1     Running   0          38s
kraftcontroller-0                    1/1     Running   0          2m49s

Verify the PV for rocksdb (1GB each):

$ kubectl get pv
NAME                                       CAPACITY   ACCESS MODES   RECLAIM POLICY   STATUS   CLAIM                                           STORAGECLASS   VOLUMEATTRIBUTESCLASS   REASON   AGE
pvc-52323f68-fab8-40b1-b1d1-4c7da9e214d8   1Gi        RWO            Delete           Bound    confluent/data0-kafka-2                         standard       <unset>                          5m42s
pvc-7858f673-19fa-4e8c-80b0-c90b2481db8a   1Gi        RWO            Delete           Bound    confluent/data0-kafka-0                         standard       <unset>                          5m42s
pvc-af3d40d8-4ca8-457a-a8a2-e4f9d0d640a6   1G         RWO            Delete           Bound    confluent/data0-kraftcontroller-0               standard       <unset>                          6m56s
pvc-b226b4f3-53a3-4c98-ab02-49e88397ea8c   1Gi        RWO            Delete           Bound    confluent/rocksdb-storage-kafka-streams-app-2   standard       <unset>                          3m9s
pvc-c3c8459c-9228-491d-91ae-c5af4d47e0e7   1Gi        RWO            Delete           Bound    confluent/rocksdb-storage-kafka-streams-app-0   standard       <unset>                          3m9s
pvc-cf5471eb-85e1-42d3-a8a3-a74fbe7c9b5f   1Gi        RWO            Delete           Bound    confluent/data0-kafka-1                         standard       <unset>                          5m42s
pvc-f43467a9-b644-4ef5-8890-3c956a3805d3   1Gi        RWO            Delete           Bound    confluent/rocksdb-storage-kafka-streams-app-1   standard       <unset>                          3m9s

Check the log of the application to evaluate the application status (RUNNING):

$ kubectl logs -f kafka-streams-app-0

[wordscounter_app-9752ab10-9474-4b55-b73d-bb4985137ee6-StreamThread-2] INFO org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [wordscounter_app-9752ab10-9474-4b55-b73d-bb4985137ee6-StreamThread-2] State transition from PARTITIONS_ASSIGNED to RUNNING
[wordscounter_app-9752ab10-9474-4b55-b73d-bb4985137ee6-StreamThread-2] INFO org.apache.kafka.streams.KafkaStreams - stream-client [wordscounter_app-9752ab10-9474-4b55-b73d-bb4985137ee6] State transition from REBALANCING to RUNNING

Produce some input data:

kubectl exec --stdin --tty kafka-0 -- /bin/bash

kafka-console-producer --bootstrap-server localhost:9092 --topic words-input-topic --property "key.separator=:" --property "parse.key=true"

user1:hello world
user2:hello kafka streams
user1:hello world
user3:kafka streams example

Verify output data, this will display the count of words in 5-minute windows:

kubectl exec --stdin --tty kafka-0 -- /bin/bash

kafka-console-consumer --bootstrap-server localhost:9092 --topic  words-counter-output-topic --from-beginning --property "print.key=true" --property "key.separator=:" --value-deserializer "org.apache.kafka.common.serialization.LongDeserializer"

ksqlDB

Saga Pattern Example using ksqlDB as an orchestrator

Implementation of a sample App (producer and consumer) sending and receiving orders; ksqlDB acts as an orchestrator to coordinate a sample Saga.

ksqlDB is needed to run the example.

More Info at: https://ksqldb.io/

Start ksqlDB:

scripts/bootstrap-ksqldb.sh

Compile:

cd ksqldb-saga-example
mvn schema-registry:download
mvn generate-sources
mvn clean compile

Connect to ksqlDB and set auto.offset.reset property:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
SET 'auto.offset.reset' = 'earliest';
exit

Execute DDLs on ksqlDB:

cd ksqldb-saga-example/ksql
./ksql-statements.sh

Create a fat jar with the Sample application (1 Saga):

cd ksqldb-saga-example
mvn clean compile assembly:single

Execute Sample application (1 Saga):

cd ksqldb-saga-example
java -jar target/ksqldb-saga-example-1.2.1-jar-with-dependencies.jar

Saga Verification:

Insert entries on ksqlDB:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
insert into accounts values('AAA', 'Jimmy Best');
insert into orders values('AAA', 150, 'Item0', 'A123', 'Jimmy Best', 'Transfer funds', '2020-04-22 03:19:51');
insert into orders values('AAA', -110, 'Item1', 'A123', 'amazon.it', 'Purchase', '2020-04-22 03:19:55');
insert into orders values('AAA', -100, 'Item2', 'A123', 'ebike.com', 'Purchase', '2020-04-22 03:19:58');

select * from orders_tx where account_id='AAA' and order_id='A123';
Order Action:{"TX_ID": "TX_AAA_A123", "TX_ACTION": 0, "ACCOUNT": "AAA", "ITEMS": ["Item0"], "ORDER": "A123"}
Order Action:{"TX_ID": "TX_AAA_A123", "TX_ACTION": 0, "ACCOUNT": "AAA", "ITEMS": ["Item0", "Item1"], "ORDER": "A123"}
Order Action:{"TX_ID": "TX_AAA_A123", "TX_ACTION": -1, "ACCOUNT": "AAA", "ITEMS": ["Item0", "Item1", "Item2"], "ORDER": "A123"}
 --> compensate:{"TX_ID": "TX_AAA_A123", "TX_ACTION": -1, "ACCOUNT": "AAA", "ITEMS": ["Item0", "Item1", "Item2", "ORDER": "A123"}

Teardown:

scripts/tear-down-ksqldb.sh

Windowing

Tumbling Window example: heart rate monitoring

Implementation of a tumbling window (1 minute) to monitor heart rate. Values over a threshold of 120 beats per minute are reported.

ksqlDB is needed to run the example.

More Info at: https://ksqldb.io/

Start ksqlDB:

scripts/bootstrap-ksqldb.sh

Connect to ksqlDB and set auto.offset.reset property:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
SET 'auto.offset.reset' = 'earliest';
exit

Execute DDLs on ksqlDB:

cd ksqldb-window-tumbling-heartbeat/ksql
./ksql-statements.sh

Insert entries on ksqlDB:

cd ksqldb-window-tumbling-heartbeat/ksql
./ksql-inserts.sh

Verify results:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

SELECT person_id,
       beat_over_threshold_count,
       TIMESTAMPTOSTRING(window_start, 'yyy-MM-dd HH:mm:ss', 'UTC') as window_start,
       TIMESTAMPTOSTRING(window_end, 'yyy-MM-dd HH:mm:ss', 'UTC') as window_end
FROM heartbeat_60sec
EMIT CHANGES;

+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|PERSON_ID                                                |BEAT_OVER_THRESHOLD_COUNT                                |WINDOW_START                                             |WINDOW_END                                               |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|MGG1                                                     |3                                                        |2023-02-18 15:10:00                                      |2023-02-18 15:11:00                                      |
|MGG1                                                     |10                                                       |2023-02-18 15:15:00                                      |2023-02-18 15:16:00                                      |

Teardown:

scripts/tear-down-ksqldb.sh

Session Window example: Vehicle Positions

Implementation of a session window (5 minutes inactive). Vehicle positions (latitude and logitude) are collected and a new window opens when the vehicle does not send its position for 5 minutes. This is considered as a new "trip".

ksqlDB is needed to run the example.

More Info at: https://ksqldb.io/

Start ksqlDB:

scripts/bootstrap-ksqldb.sh

Connect to ksqlDB and set auto.offset.reset property:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
SET 'auto.offset.reset' = 'earliest';
exit

Execute DDLs on ksqlDB:

cd ksqldb-window-session-tripsegments/ksql
./ksql-statements.sh

Insert entries on ksqlDB:

cd ksqldb-window-session-tripsegments/ksql
./ksql-inserts.sh

Verify results:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

SELECT vehicle_id,
       positions_sent,
       start_latitude,
       start_longitude,
       end_latitude,
       end_longitude,
       TIMESTAMPTOSTRING(window_start, 'yyy-MM-dd HH:mm:ss', 'UTC') as window_start,
       TIMESTAMPTOSTRING(window_end, 'yyy-MM-dd HH:mm:ss', 'UTC') as window_end
FROM trips
EMIT CHANGES;


+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|VEHICLE_ID                 |POSITIONS_SENT             |START_LATITUDE             |START_LONGITUDE            |END_LATITUDE               |END_LONGITUDE              |WINDOW_START               |WINDOW_END                 |
+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|VH1                        |5                          |42.21                      |17.12                      |42.28                      |17.16                      |2023-02-18 15:10:00        |2023-02-18 15:13:00        |
|VH1                        |2                          |42.31                      |17.17                      |42.33                      |17.18                      |2023-02-18 15:20:00        |2023-02-18 15:22:00        |

Teardown:

scripts/tear-down-ksqldb.sh

Joins

Outer: Devices and temperature measurement

Folder: ksqldb-join

This example shows how to join a STREAM with air temperatures captured by devices and a TABLE containing the information of devices.

Air Temperatures are ingested into a topic temperature.data with a RabbitMQ Source Connector.

Device Info are ingested into a topic device with a JDBC Source Connector.

Launch Docker Compose:

scripts/bootstrap-ksqldb-join.sh

Create device and temperature.data topics:

docker exec -it broker kafka-topics --bootstrap-server broker:9092 --create --topic device --replication-factor 1 --partitions 1

docker exec -it broker kafka-topics --bootstrap-server broker:9092 --create --topic temperature.data --replication-factor 1 --partitions 1

Deploy the JDBC Source connector:

{
"name" : "pgsql-sample-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:postgresql://postgres:5432/postgres",
"connection.user": "postgres",
"connection.password": "postgres",
"topic.prefix": "",
"poll.interval.ms" : 3600000,
"table.whitelist" : "public.device",
"mode":"bulk",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"transforms":"createKey,extractInt",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"id",
"transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field":"id"
}
}
curl -X POST -H Accept:application/json -H Content-Type:application/json http://localhost:8083/connectors/ -d @ksqldb-join/config/connector_jdbc_source.json

Send data to a RabbitMQ queue temperature.queue with a confluent-python producer (5 different devices):

pip3 install pika --upgrade
ksqldb-join/config/rabbit_producer.py temperature.queue 5

-->
count:	5
queue:	temperature.queue
Send	{'id': 0, 'body': 35}
Send	{'id': 1, 'body': 18}
Send	{'id': 2, 'body': 2}
Send	{'id': 3, 'body': 5}
Send	{'id': 4, 'body': 32}
Exiting
{
  "name" : "rabbitmq-sample-source",
  "config": {
    "connector.class": "io.confluent.connect.rabbitmq.RabbitMQSourceConnector",
    "confluent.topic.bootstrap.servers": "broker:9092",
    "confluent.topic.replication.factor": "1",
    "kafka.topic": "temperature.data",
    "rabbitmq.queue" : "temperature.queue",
    "rabbitmq.host" : "rabbitmq",
    "rabbitmq.username" : "guest",
    "rabbitmq.password" : "guest",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
  }
}
curl -X POST -H Accept:application/json -H Content-Type:application/json http://localhost:8083/connectors/ -d @ksqldb-join/config/connector_rabbitmq_source.json

Execute the ksqlDB statements; Stream DEVICE_TEMPERATURE is a INNER JOIN between DEVICE and TEMPERATURE.DATA

cd ksqldb-join/ksql
./ksql-statements.sh

Inner Join

Folder: ksqldb-join

Verify the enrichment with a query:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088 --execute "select * from DEVICE_TEMPERATURE EMIT CHANGES"

-->
+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+
|DEVICE_ID                                                                    |FULLNAME                                                                     |TEMPERATURE                                                                         |
+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+
|1                                                                            |foo11111                                                                     |18                                                                           |
|2                                                                            |foo22222                                                                     |2                                                                            |

Left Join

Folder: ksqldb-join

Verify the enrichment with a query:

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088 --execute "select * from DEVICE_TEMPERATURE_LJ EMIT CHANGES"

-->
+---------------------------------------------------------------------------+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|DEVICE_ID                                                                  |FULLNAME                                                                   |TEMPERATURE                                                                |
+---------------------------------------------------------------------------+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|0                                                                          |null                                                                       |15                                                                         |
|1                                                                          |foo11111                                                                   |13                                                                         |
|2                                                                          |foo22222                                                                   |16                                                                         |
|3                                                                          |null                                                                       |34                                                                         |
|4                                                                          |null                                                                       |8                                                                          |

Right Join: Devices and devices maintenance

Folder: ksqldb-join

This example shows how to join a Table and a Table

Device Info are ingested into a topic device with a JDBC Source Connector.

Maintenances are ingested into a topic maintenance with a JDBC Source Connector.

Launch Docker Compose:

scripts/bootstrap-ksqldb-join.sh

Create device and maintenance topics:

docker exec -it broker kafka-topics --bootstrap-server broker:9092 --create --topic device --replication-factor 1 --partitions 1
docker exec -it broker kafka-topics --bootstrap-server broker:9092 --create --topic maintenance --replication-factor 1 --partitions 1

Deploy the JDBC Source connector:

{
  "name" : "pgsql-sample2-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url": "jdbc:postgresql://postgres:5432/postgres",
    "connection.user": "postgres",
    "connection.password": "postgres",
    "topic.prefix": "",
    "poll.interval.ms" : 3600000,
    "table.whitelist" : "public.maintenance",
    "mode":"bulk",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "transforms":"createKey,extractInt",
    "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields":"id",
    "transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extractInt.field":"id"
  }
}
curl -X POST -H Accept:application/json -H Content-Type:application/json http://localhost:8083/connectors/ -d @ksqldb-join/config/connector_jdbc_source.json

curl -X POST -H Accept:application/json -H Content-Type:application/json http://localhost:8083/connectors/ -d @ksqldb-join/config/connector_device_maintenance_jdbc_source.json

Execute the ksqlDB statements: TABLE MAINTENANCE RIGHT JOIN TABLE DEVICE

cd ksqldb-join/ksql
./ksql-statements-rj.sh
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088 --execute "select * from DEVICE_MAINTENANCE EMIT CHANGES"

-->
+---------------------------------------------------------------------------+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|DEVICE_ID                                                                  |FULLNAME                                                                   |MAINTENANCE                                                                |
+---------------------------------------------------------------------------+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|1                                                                          |foo11111                                                                   |2023-03-01 15:00:00 16:00:00                                               |
|2                                                                          |foo22222                                                                   |null                                                                       |
|10                                                                         |foo1010101010                                                              |null                                                                       |
|15                                                                         |foo1515151515                                                              |null                                                                       |

Teardown:

scripts/tear-down-ksqldb.sh

Windowing

Tumbling Window example: heart rate monitoring

Implementation of a tumbling window (1 minute) to monitor heart rate. Values over a threshold of 120 beats per minute are reported.

SQL statements are contained in file: heartbeats.sql

Bootstrap:

scripts/bootstrap-flink.sh

Create topic heartbeat:

docker exec -it broker kafka-topics --bootstrap-server broker:9092 --create --topic heartbeat --replication-factor 1 --partitions 1

Execute Flink Job:

#Connect to sql-client container
docker exec -it sql-client bash
#launch sql statements
root@44c67639b002~$ sql-client.sh -f app/heartbeats.sql

Check if the job insert-into_default_catalog.default_database.heartbeat_60sec is running in Flink Web Console at: http://localhost:18081/#/job/running

Validate results, consuming from output topic heartbeat_60sec:

docker exec -e SCHEMA_REGISTRY_LOG4J_OPTS=" " -it schema-registry /usr/bin/kafka-avro-console-consumer \
  --topic heartbeat_60sec \
  --from-beginning \
  --bootstrap-server broker:9092
{"window_start":{"string":"2023-02-18 15:10:00"},"window_end":{"string":"2023-02-18 15:11:00"},"heartbeats_over_120":{"long":3}}

{"window_start":{"string":"2023-02-18 15:15:00"},"window_end":{"string":"2023-02-18 15:16:00"},"heartbeats_over_120":{"long":10}}

Teardown:

scripts/tear-down-flink.sh

Transactions

Transactional producer

Folder: kafka-producer

It uses org.apache.kafka.common.serialization.StringSerializer class for key and value serialization and set properties enable.idempoteceny to true and transactional.id to testTx

Create topic test-idempotent with 3 partitions:

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic test-idempotent --replication-factor 1 --partitions 3

Produce on topic test-idempotent:

cd kafka-producer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.producer.tx.Runner"

Read Committed consumer

Folder: kafka-consumer

It uses org.apache.kafka.common.serialization.StringDeserializer for key and value deserialization and set isolation.level to read_committed.

Important
It must be only used with a transactional producer.
cd kafka-consumer
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.consumer.tx.Runner"

Orders App: Example with end-to-end exactly-once semantic between consumer and producer

Folder: kafka-orders-tx

Example of a cart application implementing end-to-end exactly-once semantic between consumer and producer.

  • ItemsProducer class sends 2 items in a single transaction.

  • ItemsConsumer class receives the items and creates an order containing the items.

  • The consumer offset is committed only if the order can be created and sent.

Execute tests:

cd kafka-orders-tx
mvn clean test

Execute ItemsProducer class:

cd kafka-orders-tx
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.orders.ItemsProducer"

Execute ItemsConsumer class:

cd kafka-orders-tx
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.orders.ItemsConsumer"

Frameworks

Spring Boot

Example for a producer and consumer implemented with Spring Kafka 2.x.

Consumer implements a DLQ; implementation configures:

  • 3 retriable topics, -retry-0 (backoff 1 seconds), -retry-1 (backoff 2 seconds), -retry-2 (backoff 4 seconds)

  • 1 DLT topic, -dlt, for the main topic.

Offending messages will be retried without blocking consuming of messages. After exhausting the retries, messages will be sent to DLT.

Run on local machine:

#start a producer on port 8010
cd kafka-springboot-producer
mvn spring-boot:run

#start a consumer on port 8090
cd kafka-springboot-consumer
mvn spring-boot:run

#Send orders (on topic demoTopic)
curl --data '{"id":5, "name": "PS5"}' -H "Content-Type:application/json" http://localhost:8010/api/order

#Send ERROR orders and test DLQ (on topic demoTopic)
curl --data '{"id":5, "name": "ERROR-PS5"}' -H "Content-Type:application/json" http://localhost:8010/api/order

Quarkus

Example for a producer and consumer implemented with Quarkus extension for Apache Kafka. Every 1s a new message is sent to demo topic.

Run on local machine:

cd kafka-quarkus
#(debug port 5005)
./mvnw clean compile quarkus:dev

Run on Openshift machine:

cd kafka-quarkus
./mvnw clean package -Dquarkus.container-image.build=true -Dquarkus.kubernetes.deploy=true

Open Liberty MicroProfile v2

Example for a kafka producer and consumer running on an open liberty MicroProfile v2 runtime.

Run on docker:

#Start a kafka container
docker run -d --name my-cluster-kafka-bootstrap -p 9092:9092 apache/kafka

#Start a kafka producer container
cd kafka-microprofile2-producer
docker build -t kafka-producer:latest .
docker run -d --name kafka-producer -p 9080:9080 -e KAFKABROKERLIST=my-cluster-kafka-bootstrap:9092 --link my-cluster-kafka-bootstrap:my-cluster-kafka-bootstrap kafka-producer:latest

#Start a kafka consumer container
cd kafka-microprofile2-consumer
docker build -t kafka-consumer:latest .
docker run -d --name kafka-consumer -p 9090:9080 -e KAFKABROKERLIST=my-cluster-kafka-bootstrap:9092 --link my-cluster-kafka-bootstrap:my-cluster-kafka-bootstrap kafka-consumer:latest

#Receive orders
curl -v -X POST http://localhost:9090/kafka-microprofile2-consumer-1.2.1/order

#Send orders (500)
curl -v -X POST http://localhost:9080/kafka-microprofile2-producer-1.2.1/order

GraalVM native images for Kafka clients

GraalVM for Java microservices using Kafka clients libraries and different authentication mechanisms.

GraalVM offers Native Image, which allows you to compile JVM-based applications ahead of time into native machine code. This results in faster startup times and lower memory consumption compared to regular JVM applications.

Pre Requisites

Create the native image

  • Create package:

mvn clean package
  • Create native image

native-image --no-fallback \
   --initialize-at-build-time=org.slf4j.LoggerFactory,org.slf4j.impl.StaticLoggerBinder,org.slf4j.impl.SimpleLogger \
   -H:ReflectionConfigurationFiles=src/main/resources/META-INF/native-image/reflect-config.json \
   -H:ResourceConfigurationFiles=src/main/resources/META-INF/native-image/resource-config.json \
   -H:DynamicProxyConfigurationFiles=src/main/resources/META-INF/native-image/proxy-config.json \
   -H:AdditionalSecurityProviders=com.sun.security.sasl.Provider \
   -H:Name=kafka-clients-graalvm \
   -jar target/kafka-clients-graalvm-1.2.1-jar-with-dependencies.jar

Testing

No Authentication
chmod +x kafka-clients-graalvm-1.2.1-jar-with-dependencies

./kafka-clients-graalvm-1.2.1-jar-with-dependencies

Produce message: Hello GraalVM Kafka!
Consumed message: Hello GraalVM Kafka!
SASL PLAIN Authentication with SSL

e.g. This is the typical scenario when connecting to Confluent Cloud.

chmod +x kafka-clients-graalvm-1.2.1-jar-with-dependencies

./kafka-clients-graalvm-1.2.1-jar-with-dependencies examples/producer.properties examples/consumer.properties

Produce message: Hello GraalVM Kafka!
Consumed message: Hello GraalVM Kafka!
SASL GSSAPI Authentication

Kafka with Kerberos using docker containers: https://github.com/Dabz/kafka-security-playbook

Start a kafka cluster with Kerberos:

cd kerberos
./up

or alternatively start a kafka cluster with Kerberos and a DNS server (required for dns_lookup_kdc=true):

cd kerberos
./up dns

IMPORTANT: dns_lookup_kdc=true version is at the moment not working properly: Caused by: javax.security.auth.login.LoginException: Cannot locate KDC

Wait for the containers to be up, then login into client container

docker exec -it client /bin/bash

From client container, run:

cd kafka-examples-master/kafka-clients-graalvm/ && mvn clean package && unzip target/kafka-clients-graalvm-1.2.1-jar-with-dependencies.jar

From client container, create native image:

/tmp/graalvm-jdk-17.0.12+8.1/bin/native-image --no-fallback \
--initialize-at-build-time=org.slf4j.LoggerFactory,org.slf4j.impl.StaticLoggerBinder,org.slf4j.impl.SimpleLogger,sun.security.jgss.krb5 \
-H:ReflectionConfigurationFiles=src/main/resources/META-INF/native-image/reflect-config.json \
-H:ResourceConfigurationFiles=src/main/resources/META-INF/native-image/resource-config.json \
-H:DynamicProxyConfigurationFiles=src/main/resources/META-INF/native-image/proxy-config.json \
-H:AdditionalSecurityProviders=sun.security.jgss.SunProvider,sun.security.provider.Sun,com.sun.security.sasl.Provider \
-H:Name=kafka-clients-graalvm \
-Djava.library.path=/tmp/kafka-examples-master/kafka-clients-graalvm/linux/aarch64 \
-jar target/kafka-clients-graalvm-1.2.1-jar-with-dependencies.jar

From client container, execute native image with GSSAPI auth:

chmod +x kafka-clients-graalvm-1.2.1-jar-with-dependencies

./kafka-clients-graalvm-1.2.1-jar-with-dependencies -Djava.library.path=/tmp/kafka-examples-master/kafka-clients-graalvm/linux/aarch64 /etc/kafka/producer.properties /etc/kafka/consumer.properties

Security

ACLs

Folder: acls

This example show how to set ACLs on topics for user alice. It uses as authorizer the default implementation class: org.apache.kafka.metadata.authorizer.StandardAuthorizer

Run the components:

scripts/bootstrap-acls.sh

Create test topic:

docker exec -it broker /opt/kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --command-config /tmp/admin.properties --create --topic test

Produce messages without an explicit ACL for user alice:

docker exec -it broker /opt/kafka/bin/kafka-console-producer.sh --broker-list broker:9092 --producer.config /tmp/alice.properties --topic test --property "parse.key=true" --property "key.separator=:"
>1:test

[2023-07-12 12:18:27,972] ERROR [Producer clientId=console-producer] Topic authorization failed for topics [test] (org.apache.kafka.clients.Metadata)
[2023-07-12 12:18:27,974] ERROR Error when sending message to topic test with key: 1 bytes, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test]

Set topic read and topic write ACLs on topic test for user alice:

docker exec -it broker /opt/kafka/bin/kafka-acls.sh --bootstrap-server broker:9092 --command-config /tmp/admin.properties --add --allow-principal "User:alice" --operation Read --operation Write --topic test

Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
 	(principal=User:alice, host=*, operation=READ, permissionType=ALLOW)
	(principal=User:alice, host=*, operation=WRITE, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
 	(principal=User:alice, host=*, operation=READ, permissionType=ALLOW)
	(principal=User:alice, host=*, operation=WRITE, permissionType=ALLOW)

Produce messages with user alice:

docker exec -it broker /opt/kafka/bin/kafka-console-producer.sh --broker-list broker:9092 --producer.config /tmp/alice.properties --topic test --property "parse.key=true" --property "key.separator=:"
>1:test

Teardown:

scripts/tear-down-acls.sh

SASL Plain with SSL listener

Folder: sasl-ssl

This example shows how to define a SASL Plain with SSL listener on port 9092.

Run the components:

scripts/bootstrap-sasl-ssl.sh

The script will create all the required security files for broker and a client application in sasl-ssl/secrets folder.

CA is a fake authority: C=IT/ST=Lazio/L=Rome/O=Hifly/OU=Hifly

List of files generated:

  • CA certificate

  • CA key

  • broker csr

  • broker certificate

  • broker keystore

  • broker truststore

  • client csr

  • client certificate

  • client keystore

  • client truststore

Then a broker on port 9092 will be started.

To test the connection, you can try producing some data to a topic client.properties

client CN is: CN=client,OU=Hifly,O=Hifly,L=Rome,ST=Lazio,C=IT

docker exec -it broker /opt/kafka/bin/kafka-console-producer.sh --broker-list broker:9092 --producer.config /tmp/client.properties --topic topic1 --property "parse.key=true" --property "key.separator=:"
1:test
2:test2

Teardown:

scripts/tear-down-sasl-ssl.sh

mTLS listener

Folder: mtls-listener

This example shows how to define a mTLS listener on listener port 9092.

mTLS ensures that both parties in the communication, client and broker, will trust each other exchanging SSL certificates.

Run the components:

scripts/bootstrap-mtls.sh

The script will create all the required security files for broker and a client application in mtls-listener/ssl folder.

CA is a fake authority: C=IT/ST=Lazio/L=Rome/O=Hifly/OU=Hifly

List of files generated:

  • CA certificate

  • CA key

  • broker csr

  • broker certificate

  • broker keystore

  • broker truststore

  • client csr

  • client certificate

  • client keystore

  • client truststore

Then a broker with mTLS on port 9092 will be started.

To test the mTLS connection, you can try producing some data to a topic with the client keystore already generated and using mTLS client.properties

Important
mTLS listener has been configured with hostname verification: ssl.endpoint.identification.algorithm= https

client CN is: CN=client,OU=Hifly,O=Hifly,L=Rome,ST=Lazio,C=IT

docker exec -it broker /opt/kafka/bin/kafka-console-producer.sh --broker-list broker:9092 --producer.config /tmp/client.properties --topic topic1 --property "parse.key=true" --property "key.separator=:"
1:test
2:test2

Teardown:

scripts/tear-down-mtls.sh

Multiple listeners

Folder: multi-listener

This example shows how to define 2 listeners, 1 INTERNAL on port 9092 and 1 for external clients on port 9093 with SASL PLAIN authentication and SSL enabled.

Run the example:

scripts/bootstrap-multi-listener.sh

The script will create all the required security files for broker and a client application in multi-listener/ssl folder.

CA is a fake authority: C=IT/ST=Lazio/L=Rome/O=Hifly/OU=Hifly

List of files generated:

  • CA certificate

  • CA key

  • broker csr

  • broker certificate

  • broker keystore

  • broker truststore

  • client truststore

To test the SASL PLAIN SSL connection, you can try producing some data to a topic using client.properties with user admin/admin_secret

docker exec -it broker kafka-console-producer --broker-list broker:9093 --producer.config /tmp/client.properties --topic topic1 --property "parse.key=true" --property "key.separator=:"
1:test
2:test2

Teardown:

scripts/tear-down-multi-listener.sh

Custom Authorizer

Folder: authorizers

This example shows how to create a custom authorizer.

Important
this example is only for demo purposes and it’s not intended to be deployed in production.

Custom Authorizer org.hifly.kafka.authorizer.DummyAuthirizer extends the basic AclAuthorizer and allows authenticated users to execute operations on topics without setting any ACLs on them.

Compile and package:

cd authorizers
mvn clean package
cp -rf ./target/authorizers-1.2.1.jar ./jars

Run broker with custom authorizer on port 9092:

scripts/bootstrap-auth.sh

Run a producer test using the producer.properties on listener port 9092:

Producer command:

docker exec -it broker kafka-console-producer --broker-list broker:9093 --topic test --producer.config /tmp/producer.properties

Run a consumer test using the consumer.properties on listener port 9092:

Consumer command:

docker exec -it broker kafka-console-consumer --broker-list broker:9093 --topic test --consumer.config /tmp/consumer.properties

Teardown:

scripts/tear-down-auth.sh

Custom Principal Builder

This example shows how to create a custom KafkaPrincipalBuilder to validate the value of CN attribute in SSL certificate.

Only the following CN are allowed:

  • CN=broker

  • CN=client

Important
this example is only for demo purposes and it’s not intended to be deployed in production.

Run the example:

scripts/bootstrap-principal.sh

The script will create all the required security files for broker and 2 client applications in principal-builder/ssl folder.

CA is a fake authority: C=IT/ST=Lazio/L=Rome/O=Hifly/OU=Hifly

List of files generated:

  • CA certificate

  • CA key

  • broker csr

  • broker certificate

  • broker keystore

  • broker truststore

  • client csr

  • client certificate

  • client keystore

  • client truststore

  • client2 csr

  • client2 certificate

  • client2 keystore

  • client2 truststore

Then a broker with mTLS on port 9092 will be started.

Important
mTLS listener has been configured with hostname verification: ssl.endpoint.identification.algorithm= https

To test the mTLS connection, you can try producing some data to a topic with the client keystore already generated and using mTLS client.properties

client CN is: CN=client,OU=Hifly,O=Hifly,L=Rome,ST=Lazio,C=IT

docker exec -it broker kafka-console-producer --broker-list broker:9093 --topic topic1 --producer.config /tmp/client.properties --property "parse.key=true" --property "key.separator=:"
1:test
2:test2

Messages will be sent with no error; in broker log, you will notice:

[2023-05-31 09:26:03,909] INFO SUBJECT:CN=client,OU=Hifly,O=Hifly,L=Rome,ST=Lazio,C=IT (org.hifly.kafka.principal.CustomPrincipalBuilder)

To test the mTLS connection with a not-allowed client, use the property file client2.properties

client CN is: CN=client2,OU=Hifly,O=Hifly,L=Rome,ST=Lazio,C=IT

docker exec -it broker kafka-console-producer --broker-list broker:9093 --topic topic1 --producer.config /tmp/client2.properties--property "parse.key=true" --property "key.separator=:"
1:test
2:test2

Messages will not be sent; in broker log, you will notice:

[2023-05-31 09:34:27,868] ERROR Exception while processing request from 192.168.80.3:9092-192.168.80.1:54098-18 (kafka.network.Processor)
java.lang.IllegalStateException: Can't gather SSL certificates!
	at org.hifly.kafka.principal.CustomPrincipalBuilder.build(CustomPrincipalBuilder.java:61)

Teardown:

scripts/tear-down-principal.sh

OAUTH Authentication: SASL/OAUTHBEARER

This example shows how to configure broker to use SASL/OAUTHBEARER authentication with Support for OIDC.

To run the sample you need to run Keycloak server and configure openid-connect on it.

Run Keycloak server with PostgreSQL (on port 8080) and then run cluster with OAUTH listener on port 9093:

scripts/bootstrap-oauth.sh

Keycloak setup:

 - Login to http://localhost:8080 (admin/Pa55w0rd)
 - Create a realm called kafka
 - From the Clients tab, create a client with Cliend ID "kafka_user".
 - Change Access Type to Confidential
 - Turn Standard Flow Enabled to OFF
 - Turn Service Accounts Enabled to ON
 - In the Advanced Settings below on the settings tab, set Access Token Lifespan to 10 minutes
 - Switch to the Credentials tab
 - Set Client Authenticator to "Client Id and Secret"
 - Copy the client-secret
 - Save

Run a producer test using the client-oauth-properties (add your client_secret into the file) on listener port 9093:

client-oauth-properties:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.login.connect.timeout.ms=15000
sasl.oauthbearer.token.endpoint.url=http://localhost:8080/auth/realms/kafka/protocol/openid-connect/token
sasl.oauthbearer.expected.audience=account
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId="kafka_user" clientSecret="<client_secret>";

Producer command:

kafka-producer-perf-test --topic my_topic --num-records 50 --throughput 10 --record-size 1 --producer-props bootstrap.servers=localhost:9093  --producer.config kafka-oauth-kip-768/client-oauth.properties

Teardown:

scripts/tear-down-oauth.sh

Service Quota

Network bandwidth

Folder: quotas

This scenario will show how to set network bandwidth quotas on principal (user). The scenario requires a listener with SASL_PLAIN authentication.

Bootstrap will initialize a Prometheus server on port 9090 and a Grafana UI with Kafka Quota dashboard on port 3000.

Start scenario:

scripts/bootstrap-quotas.sh

Define network bandwidth quotas for user alice:

  • producer byte rate: 1MB

  • consumer byte rate: 1MB

docker exec broker kafka-configs  --bootstrap-server broker:9092 --command-config /tmp/alice.properties --alter --add-config 'producer_byte_rate=1000000,consumer_byte_rate=1000000' --entity-type users --entity-name alice

Run a producer perf test and see how throttling applies:

docker exec broker kafka-producer-perf-test --topic test --num-records 1000000 --record-size 100 --throughput -1 --producer.config /tmp/alice.properties --producer-props bootstrap.servers=broker:9092

Run a consumer and see how throttling applies:

docker exec -it broker kafka-console-consumer --topic test --bootstrap-server broker:9092 --from-beginning --consumer.config /tmp/alice.properties

You can visualize metrics using the Grafana dashboard for Kafka Quotas fetched from https://github.com/confluentinc/jmx-monitoring-stacks repository

Kafka Quotas Grafana
Figure 1. Kafka Quotas for "alice" as shown in Grafana

Teardown:

scripts/tear-down-quotas.sh

High Availability and Disaster Recovery

Active/Active clusters with Mirror Maker v2

Folder: mirror-maker2

Example of a Mirror Maker v2 configuration Active/Active.

Run the example:

scripts/bootstrap-mm2.sh

A source cluster on port 9092 and a destination cluster on port 9082 will be created.

Create TopicA on source cluster and TopicB on destination cluster:

docker exec broker kafka-topics --bootstrap-server broker:9092 --create --topic TopicA --replication-factor 1 --partitions 3
docker exec broker-destination kafka-topics --bootstrap-server broker-destination:9082 --create --topic TopicB --replication-factor 1 --partitions 3

Launch Mirror Maker, both directions:

  • source cluster is identified with name DC-X

  • destination cluster is identified with name DC-Y

  • TopicA will be copied on destination cluster with naming DC-X.TopicA

  • TopicB will be copied on source cluster with naming DC-Y.TopicB

Mirror Maker config:

# specify any number of cluster aliases
clusters=DC-X,DC-Y

# connection information for each cluster
DC-X.bootstrap.servers=broker:9092
DC-Y.bootstrap.servers=broker-destination:9082

# enable and configure individual replication flows
DC-X->DC-Y.enabled = true
DC-X->DC-Y.topics = TopicA
DC-Y->DC-X.enabled = true
DC-Y->DC-X.topics = TopicB

# customize as needed
sync.topic.acls.enabled=true
sync.topic.configs.enabled=true

offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1
replication.factor = 1
checkpoints.topic.replication.factor=1
heartbeats.topic replication.factor=1
docker exec broker-destination bash -c 'export KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/etc/kafka/connect-log4j.properties && connect-mirror-maker /tmp/mm2.properties'

Topic list on source cluster:

docker exec broker kafka-topics --bootstrap-server broker:9092 --list

DC-Y.TopicB
DC-Y.checkpoints.internal
TopicA
__consumer_offsets
heartbeats
mm2-configs.DC-Y.internal
mm2-offsets.DC-Y.internal
mm2-status.DC-Y.internal

Topic list on destination cluster:

docker exec broker-destination kafka-topics --bootstrap-server broker-destination:9082 --list

DC-X.TopicA
DC-X.checkpoints.internal
TopicB
__consumer_offsets
heartbeats
mm2-configs.DC-X.internal
mm2-offsets.DC-X.internal
mm2-status.DC-X.internal

Run Producer Perf Test on source cluster for TopicA:

docker exec broker kafka-producer-perf-test --topic TopicA --num-records 10000 --record-size 100 --throughput -1 --producer-props acks=1 bootstrap.servers=broker:9092 buffer.memory=67108864 batch.size=8196

Run Producer Perf Test on destination cluster for TopicB:

docker exec broker-destination kafka-producer-perf-test --topic TopicB --num-records 10000 --record-size 100 --throughput -1 --producer-props acks=1 bootstrap.servers=broker-destination:9082 buffer.memory=67108864 batch.size=8196

Verify Topic message size for TopicA (source cluster) and for DCX.TopicA (destination cluster):

docker exec broker ls -ltr /tmp/kraft-combined-logs/TopicA-0
docker exec broker-destination ls -ltr /tmp/kraft-combined-logs/DC-X.TopicA-0
docker exec broker ls -ltr /tmp/kraft-combined-logs/TopicA-1
docker exec broker-destination ls -ltr /tmp/kraft-combined-logs/DC-X.TopicA-1
docker exec broker ls -ltr /tmp/kraft-combined-logs/TopicA-2
docker exec broker-destination ls -ltr /tmp/kraft-combined-logs/DC-X.TopicA-2

Verify Topic message size for TopicB (destination cluster) and for DCY.TopicB (source cluster):

docker exec broker ls -ltr /tmp/kraft-combined-logs/DC-Y.TopicB-0
docker exec broker-destination ls -ltr /tmp/kraft-combined-logs/TopicB-0
docker exec broker ls -ltr /tmp/kraft-combined-logs/DC-Y.TopicB-1
docker exec broker-destination ls -ltr /tmp/kraft-combined-logs/TopicB-1
docker exec broker ls -ltr /tmp/kraft-combined-logs/DC-Y.TopicB-2
docker exec broker-destination ls -ltr /tmp/kraft-combined-logs/TopicB-2

Teardown:

scripts/tear-down-mm2.sh

Monitoring

JMX

Folder: monitoring

Expose JMX port on components and test MBeans.

  • jmx port broker: 9101

Start scenario:

scripts/bootstrap-monitoring.sh

List all MBeans with jmxterm:

# Download jmxterm
$ wget https://github.com/jiaqi/jmxterm/releases/download/v1.0.2/jmxterm-1.0.2-uber.jar
# Execute
$ java -jar jmxterm-1.0.2-uber.jar --url localhost:9101

$> beans

Offline Partitions:

kafka-run-class kafka.tools.JmxTool --jmx-url service:jmx:rmi:///jndi/rmi://localhost:9101/jmxrmi --object-name kafka.controller:type=KafkaController,name=OfflinePartitionsCount --one-time true

Trying to connect to JMX url: service:jmx:rmi:///jndi/rmi://localhost:9101/jmxrmi.
"time","kafka.controller:type=KafkaController,name=OfflinePartitionsCount:Value"
1688273226797,0

Get Under Replicated Partitions:

kafka-run-class kafka.tools.JmxTool --jmx-url service:jmx:rmi:///jndi/rmi://localhost:9101/jmxrmi --object-name kafka.server:name=UnderMinIsrPartitionCount,type=ReplicaManager --one-time true

Trying to connect to JMX url: service:jmx:rmi:///jndi/rmi://localhost:9101/jmxrmi.
"time","kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount:Value"
1688272551582,0

Number of Active controllers:

kafka-run-class kafka.tools.JmxTool --jmx-url service:jmx:rmi:///jndi/rmi://localhost:9101/jmxrmi --object-name kafka.controller:type=KafkaController,name=ActiveControllerCount

Trying to connect to JMX url: service:jmx:rmi:///jndi/rmi://localhost:9101/jmxrmi.
"time","kafka.controller:type=KafkaController,name=ActiveControllerCount:Value"
1688273125932,1

Teardown:

scripts/tear-down-monitoring.sh

Observability

Distributed Tracing with OpenTelemetry for Apache Kafka® applications

This example shows how to configure OpenTelemetry Java auto-instrumentation for a stream application enabling distributed tracing.

In this example it is used opentelemetry-java-instrumentation to inject OpenTelemetry auto instrumentation as a JVM agent requiring no modifications at source code to add the traces.

Producers, consumers and streams are part of the supported libraries as documented at OTEL Java documentation

Run OpenTelemetry collector (otlp protocol on port 4317) and Jaeger (on port 16686):

scripts/bootstrap-tracing.sh

Create sum-input-topic and sum-output-topic topics:

docker exec -it broker kafka-topics --bootstrap-server broker:9092 --create --topic sum-input-topic --replication-factor 1 --partitions 1
docker exec -it broker kafka-topics --bootstrap-server broker:9092 --create --topic sum-output-topic --replication-factor 1 --partitions 1

Run the stream application with the OpenTelemetry agent:

cd kafka-streams
mvn clean package
cd ..

export OTEL_SERVICE_NAME=stream-sum-service
export OTEL_TRACES_EXPORTER=otlp

java -javaagent:kafka-distributed-tracing/app/opentelemetry-javaagent.jar -Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true -jar kafka-streams/target/kafka-streams-1.2.1.jar

Send messages to sum-input-topic input topics:

docker exec -it broker kafka-console-producer --broker-list broker:9092 --topic sum-input-topic --property "parse.key=true" --property "key.separator=:"
"John":1
"Mark":2
"John":5

Read from sum-output-topic topic:

docker exec -it broker kafka-console-consumer --topic sum-output-topic --bootstrap-server broker:9092 --from-beginning --property print.key=true --property key.separator=" : " --value-deserializer "org.apache.kafka.common.serialization.IntegerDeserializer"

Open the JaegerUI on http://localhost:16686, you will have a list of traces from the streaming application.

Traces
Figure 2. List of traces in Jaeger UI

Teardown:

scripts/tear-down-tracing.sh

Performance

Folder: performance

Specs

Important
Results may vary and are dependent on the performances of your host
Important
By default, a container has no resource constraints and can use as much of a given resource as the host’s kernel scheduler allows. Docker provides ways to control how much memory, or CPU a container can use, setting runtime configuration flags.

You can check docker usage during the test using docker stats command:

CONTAINER ID   NAME         CPU %     MEM USAGE / LIMIT     MEM %     NET I/O           BLOCK I/O   PIDS
041a74e554a3   kafka-perf   0.09%     167.2MiB / 30.99GiB   0.53%     694kB / 86.1MB    0B / 0B     61
b693e9dbdfa0   broker2      0.45%     500.4MiB / 30.99GiB   1.58%     88.5MB / 59.8MB   0B / 0B     81
0a97b237f198   broker3      0.41%     486.3MiB / 30.99GiB   1.53%     88.5MB / 60.5MB   0B / 0B     81
4a678630aa03   broker       0.41%     521.2MiB / 30.99GiB   1.64%     88.5MB / 59.8MB   0B / 0B     85
45e9d4500d66   zookeeper    0.13%     98.12MiB / 30.99GiB   0.31%     163kB / 87.8kB    0B / 0B     69

Docker containers can be configured for RAM and CPU limits using:

  • mem_limit – represents the hard memory limits.

  • mem_reservation: represents the soft memory limits.

  • cpus – represents the CPU limit.

  • cpuset – represents the limit on a specific CPU.

  broker:
    image: confluentinc/cp-kafka:${CONFLUENT_VERSION}
    hostname: broker
    container_name: broker
    mem_limit: "1g"
    mem_reservation: "512m"
    cpus: "1"
    cpuset: "2"
    ...

Test machine

Cluster is formed with 3 brokers (9092, 9093, 9094).

1 test machine, named kafka_perf is available to run test scenarios against the kafka cluster.

Start the cluster and kafka_perf machine:

scripts/bootstrap-performance.sh

Trogdor

Tests will run using Trogdor, a test framework for Apache Kafka.

Trogdor requires:

  • a coordinator running on a test machine, it manages multiple agent processes.

  • an agent running on each broker machine. Each agent process is responsible for a single cluster node.

Start Agents:

docker exec broker sh /tmp/trogdor/agent/trogdor-agent.sh
docker exec broker2 sh /tmp/trogdor/agent/trogdor-agent.sh
docker exec broker3 sh /tmp/trogdor/agent/trogdor-agent.sh

Agent logs will be stored in file /home/appuser/trogdor-agent.log on each broker machine.

Start Coordinator:

docker exec kafka-perf sh /tmp/trogdor/coordinator/trogdor-coordinator.sh

Coordinator logs will be stored in file /tmp/trogdor/coordinator/trogdor-coordinator/trogdor-coordinator.log on kafka-perf machine.

Effects with compression

No Compression

Run a ProduceBench scenario with:

  • 10000 messages on each broker for a total of 30000 messages

  • 200 messages per second on each broker for a total of 600 messages per second

  • record size 2k

  • test duration 1 minute

  • no compression

  • producer batch size set to 16k

  • producer linger.ms set to 10

Connect to the Coordinator:

docker exec -it kafka-perf /bin/bash
cd /tmp/trogdor/coordinator/compression/nocompression/

Run the test:

./trogdor-task.sh

Check the status and results:

./trogdor-task-status.sh
Task node-0 of type org.apache.kafka.trogdor.workload.ProduceBenchSpec is DONE. FINISHED at 2024-07-09T13:20:43.789Z after 52s
Status: {
  "totalSent" : 10000,
  "averageLatencyMs" : 13.2553,
  "p50LatencyMs" : 13,
  "p95LatencyMs" : 25,
  "p99LatencyMs" : 29,
  "transactionsCommitted" : 0
}

Task node-1 of type org.apache.kafka.trogdor.workload.ProduceBenchSpec is DONE. FINISHED at 2024-07-09T13:20:43.521Z after 51s
Status: {
  "totalSent" : 10000,
  "averageLatencyMs" : 12.5799,
  "p50LatencyMs" : 13,
  "p95LatencyMs" : 25,
  "p99LatencyMs" : 29,
  "transactionsCommitted" : 0
}

Task node-2 of type org.apache.kafka.trogdor.workload.ProduceBenchSpec is DONE. FINISHED at 2024-07-09T13:20:43.258Z after 50s
Status: {
  "totalSent" : 10000,
  "averageLatencyMs" : 11.5961,
  "p50LatencyMs" : 13,
  "p95LatencyMs" : 24,
  "p99LatencyMs" : 30,
  "transactionsCommitted" : 0
}

Cancel the tasks:

./trogdor-task-cancel.sh

Alternative: you can run kafka-producer-perf-test tool instead of trogdor and test the scenario:

docker exec kafka-perf sh kafka-producer-perf-test.sh --topic topic-perf --num-records 10000 --record-size 2000 --throughput -1 --producer-props bootstrap.servers=broker:9092,broker2:9093,broker3:9094 --print-metrics
10000 records sent, 5425.935974 records/sec (10.35 MB/sec), 829.02 ms avg latency, 1346.00 ms max latency, 837 ms 50th, 1300 ms 95th, 1335 ms 99th, 1345 ms 99.9th.
producer-metrics:batch-size-avg:{client-id=perf-producer-client}                             : 16133.000
producer-metrics:compression-rate-avg:{client-id=perf-producer-client}                       : 1.000
producer-metrics:outgoing-byte-rate:{client-id=perf-producer-client}                         : 637647.064
producer-metrics:request-latency-avg:{client-id=perf-producer-client}                        : 36.831
lz4

Run a RoundTripWorkload scenario with:

  • 10000 messages on each broker for a total of 30000 messages

  • 200 messages per second on each broker for a total of 600 messages per second

  • record size 2k

  • test duration 1 minute

  • lz4 compression

  • producer batch size set to 16k

  • producer linger.ms set to 10

Connect to the Coordinator:

docker exec -it kafka-perf /bin/bash
cd /tmp/trogdor/coordinator/compression/lz4/

Run the test:

./trogdor-task.sh

Check the status and results:

./trogdor-task-status.sh
Task node-0 of type org.apache.kafka.trogdor.workload.ProduceBenchSpec is DONE. FINISHED at 2024-07-09T13:22:49.994Z after 50s
Status: {
  "totalSent" : 10000,
  "averageLatencyMs" : 7.9342,
  "p50LatencyMs" : 8,
  "p95LatencyMs" : 17,
  "p99LatencyMs" : 23,
  "transactionsCommitted" : 0
}

Task node-1 of type org.apache.kafka.trogdor.workload.ProduceBenchSpec is DONE. FINISHED at 2024-07-09T13:22:50.786Z after 50s
Status: {
  "totalSent" : 10000,
  "averageLatencyMs" : 8.1347,
  "p50LatencyMs" : 8,
  "p95LatencyMs" : 18,
  "p99LatencyMs" : 24,
  "transactionsCommitted" : 0
}

Task node-2 of type org.apache.kafka.trogdor.workload.ProduceBenchSpec is DONE. FINISHED at 2024-07-09T13:22:51.543Z after 50s
Status: {
  "totalSent" : 10000,
  "averageLatencyMs" : 8.3103,
  "p50LatencyMs" : 9,
  "p95LatencyMs" : 18,
  "p99LatencyMs" : 23,
  "transactionsCommitted" : 0
}

Cancel the tasks:

./trogdor-task-cancel.sh

Alternative: you can run kafka-producer-perf-test tool instead of trogdor and test the scenario:

docker exec kafka-perf sh kafka-producer-perf-test.sh --topic topic-perf --num-records 10000 --record-size 2000 --throughput -1 --producer-props bootstrap.servers=broker:9092,broker2:9093,broker3:9094 compression.type=lz4 --print-metrics
10000 records sent, 8045.052293 records/sec (15.34 MB/sec), 329.08 ms avg latency, 572.00 ms max latency, 357 ms 50th, 538 ms 95th, 558 ms 99th, 572 ms 99.9th
producer-metrics:batch-size-avg:{client-id=perf-producer-client}                             : 14134.782
producer-metrics:compression-rate-avg:{client-id=perf-producer-client}                       : 1.001
producer-metrics:outgoing-byte-rate:{client-id=perf-producer-client}                         : 651333.258
producer-metrics:request-latency-avg:{client-id=perf-producer-client}                        : 19.609

Teardown:

scripts/tear-down-performance.sh

Proxy

Folder: proxy

This sample program written in golang shows how to create a TCP proxy for Apache Kafka®.

Important
this proxy is not intended to be used in production

Run the components:

scripts/bootstrap.sh

Launch the proxy:

go run proxy/kafkaproxy.go

Proxy will listen to localhost, on port 1999 and proxy requests to broker, on port 9092

Test the proxy with some commands:

kafka-producer-perf-test --topic topic-perf --num-records 1000000 --record-size 2000 --throughput -1 --producer-props bootstrap.servers=localhost:1999 --print-metrics

Teardown:

scripts/tear-down.sh

UDP Proxy

Folder: udp-proxy

This example will show how to send UDP packets to a kafka topic.

Run UDP Proxy:

start.sh <number of receivers> <bind address> <bind port> <topic name>

example:

start.sh 5 230.0.0.0 4446 telemetry

This will start 5 UDP Multicast receivers listening on 230.0.0.0 and port 4446 publishing to kafka topic telemetry.

Simulate a stress test:

# send 1000 bytes 10 times
python udp_stress_client.py 1000 10

Teardown:

stop.sh

Large objects

Claim Check Pattern

Folder: claim-check

In this example, kafka messages will be produced to a topic named items. Messages contain a field named url with a reference for the object stored in MinIO.

This pattern is known as Claim Check; you can find more details on: https://developer.confluent.io/patterns/event-processing/claim-check/

Run the components:

  • MinIO will be created on port 9000 and 9001

scripts/bootstrap.sh

Create a items topic:

docker exec -it broker kafka-topics --bootstrap-server broker:9092 --create --topic items --replication-factor 1 --partitions 1

Run the producer; object will be first stored on MinIO and then sent to a kafka topic named items with the object URI:

cd claim-check
mvn clean compile && mvn exec:java -Dexec.mainClass="org.hifly.kafka.demo.claimcheck.Runner"

Teardown:

scripts/tear-down-claim-check.sh