Kafka Streams POC
This POC was created using instructions of Kafka Streams Tutorial.
- Install Kafka
Create a Kafka User to execute Kafka
sudo useradd kafka -m
sudo passwd kafka
sudo adduser kafka sudo
Change to Kafka User
su -l kafka
Download Kafka
wget https://downloads.apache.org/kafka/2.5.0/kafka_2.12-2.5.0.tgz
Extract file
mkdir ~/kafka && cd ~/kafka
tar -xvzf ~/kafka_2.12-2.5.0.tgz --strip 1
Configure server
nano ~/kafka/config/server.properties
Add the following line to enable delete topics
You can use a docker-compose file in the repository root to start a Kafka Instance, using the following command:
docker-compose up
Create systemd for zookeeper
sudo nano /etc/systemd/system/zookeeper.service
Requires=network.target remote-fs.target
After=network.target remote-fs.target
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
Create systemd for kafka
sudo nano /etc/systemd/system/kafka.service
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
Start Kafka
sudo systemctl start kafka
Check if OK
sudo journalctl -u kafka
- Create Topics
Create streams-plaintext-input topic
~/kafka/bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic streams-plaintext-input
Create streams-wordcount-output topic
~/kafka/bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic streams-pipe-output
Check if topis were created
~/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
This POC has 3 projects, as described below:
This project read messages from streams-plaintext-input and output the same message to streams-pipe-output topic. To test, you should execute:
gradle run
After that, you can use kafka commands to publish and check messages, as below:
To publish:
~/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
To listen:
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-pipe-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
This project read messages from streams-plaintext-input and output the message splitted by words to streams-linesplit-output topic.
Before you run, you need to create output topic in kafka, using the command below:
~/kafka/bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic streams-linesplit-output
To test, you should execute:
gradle run
After that, you can use kafka commands to publish and check messages, as below:
To publish:
~/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
To listen:
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-linesplit-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
Count the number of each word sent to topic. Before you run, you need to create output topic in kafka. As this topic is a changelog stream, you should create with log compaction enable, using the command below:
~/kafka/bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic streams-wordcount-output \
--config cleanup.policy=compact
To test, you should execute:
gradle run
After that, you can use kafka commands to publish and check messages, as below:
To publish:
~/kafka/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input
To listen:
~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer