- Import the apt key:
wget --quiet --output-document - https://packages.confluent.io/deb/5.2/archive.key | sudo apt-key add -
- Add Confluent repository:
sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.2/ stable main"
- Install Kafka Confluent and its dependencies:
sudo apt-get update && sudo apt-get install --quiet --yes openjdk-8-jdk confluent-community-2.12
- Configure the host entries:
# /etc/hosts
<PRIVATE_IP_BROKER_1> zoo-1 kafka-1
<PRIVATE_IP_BROKER_2> zoo-2 kafka-2
<PRIVATE_IP_BROKER_3> zoo-3 kafka-3
- Configure Zookeeper:
# /etc/kafka/zookeeper.properties
- Configure Zookeeper ID file:
# /var/lib/zookeeper/myid
[ZOOKEEPER_ID] # ex. 1, 2, 3 and so on.
- Enable Zookeeper service:
sudo systemctl enable confluent-zookeeper
- Start Zookeeper service:
sudo systemctl start confluent-zookeeper
- Configure Kafka:
# /etc/kafka/server.properties
broker.id=[BROKER_ID] # ex. 1, 2, 3 and so on.
- Enable Kafka service:
sudo systemctl enable confluent-kafka
- Start Kafka service:
sudo systemctl start confluent-kafka
- Check the services status:
sudo systemctl status confluent*
- Check broker cluster listing topics:
kafka-topics \
--bootstrap-server kafka-[BROKER_ID]:9092 \
- Create the topic inventory_purchases:
kafka-topics \
--bootstrap-server localhost:9092 \
--create \
--topic inventory_purchases \
--replication-factor 3 \
--partitions 6
- Create the producer and add some messages:
kafka-console-producer \
--broker-list localhost:9092 \
--topic inventory_purchases
> product: apples, quantity: 5
> product: lemons, quantity: 7
> product: bananas, quantity: 3
- Create the consumer:
kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic inventory_purchases \
- Create a Consumer Group, with one consumer:
kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic inventory_purchases \
--group 1 > /tmp/group1_consumer1.txt
- Create another Consumer Group, with two consumers:
# In one session
kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic inventory_purchases \
--group 2 > /tmp/group2_consumer1.txt
# In another session
kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic inventory_purchases \
--group 2 > /tmp/group2_consumer2.txt
- Create the topic streams-input-topic:
kafka-topics \
--bootstrap-server localhost:29092 \
--create \
--topic streams-input-topic \
--replication-factor 3 \
--partitions 3
- Create the topic streams-output-topic:
kafka-topics \
--bootstrap-server localhost:39092 \
--create \
--topic streams-output-topic \
--replication-factor 3 \
--partitions 3
- Start Stream Java project:
./gradlew run
- Create a consumer for streams-output-topic:
kafka-console-consumer \
--bootstrap-server localhost:39092 \
--topic streams-output-topic \
--property print.key=true
- Create a producer for streams-input-topic:
kafka-console-producer \
--broker-list localhost:29092 \
--topic streams-input-topic \
--property parse.key=true \
--property key.separator=:
# To add messages
- Create the topic stateless-transformations-input-topic:
kafka-topics \
--bootstrap-server localhost:29092 \
--create \
--topic stateless-transformations-input-topic \
--replication-factor 3 \
--partitions 3
- Create the topic stateless-transformations-output-topic:
kafka-topics \
--bootstrap-server localhost:39092 \
--create \
--topic stateless-transformations-output-topic \
--replication-factor 3 \
--partitions 3
- Start StatelessTransformations Java project:
# streams
./gradlew runStatelessTransformations
- Create a consumer for stateless-transformations-output-topic:
kafka-console-consumer \
--bootstrap-server localhost:39092 \
--topic stateless-transformations-output-topic \
--property print.key=true
- Create a producer for stateless-transformations-input-topic:
kafka-console-producer \
--broker-list localhost:29092 \
--topic stateless-transformations-input-topic \
--property parse.key=true \
--property key.separator=:
# To add messages
- Create the topic aggregations-input-topic:
kafka-topics \
--bootstrap-server localhost:29092 \
--create \
--topic aggregations-input-topic \
--replication-factor 3 \
--partitions 3
- Create the topic aggregations-output-character-count-topic:
kafka-topics \
--bootstrap-server localhost:39092 \
--create \
--topic aggregations-output-character-count-topic \
--replication-factor 3 \
--partitions 3
- Create the topic aggregations-output-count-topic:
kafka-topics \
--bootstrap-server localhost:39092 \
--create \
--topic aggregations-output-count-topic \
--replication-factor 3 \
--partitions 3
- Create the topic aggregations-output-reduce-topic:
kafka-topics \
--bootstrap-server localhost:39092 \
--create \
--topic aggregations-output-reduce-topic \
--replication-factor 3 \
--partitions 3
- Start AggregationsMain Java project:
# streams
./gradlew runAggregations
- Create a consumer for aggregations-output-character-count-topic:
kafka-console-consumer \
--bootstrap-server localhost:39092 \
--topic aggregations-output-character-count-topic \
--property print.key=true \
--property value.deserializer=org.apache.kafka.common.serialization.IntegerDeserializer
- Create a consumer for aggregations-output-count-topic:
kafka-console-consumer \
--bootstrap-server localhost:39092 \
--topic aggregations-output-count-topic \
--property print.key=true \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
- Create a consumer for aggregations-output-reduce-topic:
kafka-console-consumer \
--bootstrap-server localhost:39092 \
--topic aggregations-output-reduce-topic \
--property print.key=true
- Create a producer for aggregations-input-topic:
kafka-console-producer \
--broker-list localhost:29092 \
--topic aggregations-input-topic \
--property parse.key=true \
--property key.separator=:
# To add messages
- Create the topic joins-input-topic-left:
kafka-topics \
--bootstrap-server localhost:29092 \
--create \
--topic joins-input-topic-left \
--replication-factor 3 \
--partitions 3
- Create the topic joins-input-topic-right:
kafka-topics \
--bootstrap-server localhost:39092 \
--create \
--topic joins-input-topic-right \
--replication-factor 3 \
--partitions 3
- Create the topic inner-join-output-topic:
kafka-topics \
--bootstrap-server localhost:39092 \
--create \
--topic inner-join-output-topic \
--replication-factor 3 \
--partitions 3
- Create the topic left-join-output-topic:
kafka-topics \
--bootstrap-server localhost:39092 \
--create \
--topic left-join-output-topic \
--replication-factor 3 \
--partitions 3
- Create the topic outer-join-output-topic:
kafka-topics \
--bootstrap-server localhost:39092 \
--create \
--topic outer-join-output-topic \
--replication-factor 3 \
--partitions 3
- Start JoinsMain Java project:
# streams
./gradlew runJoins
- Create a consumer for inner-join-output-topic:
kafka-console-consumer \
--bootstrap-server localhost:29092 \
--topic inner-join-output-topic \
--property print.key=true
- Create a consumer for left-join-output-topic:
kafka-console-consumer \
--bootstrap-server localhost:39092 \
--topic left-join-output-topic \
--property print.key=true
- Create a consumer for outer-join-output-topic:
kafka-console-consumer \
--bootstrap-server localhost:49092 \
--topic outer-join-output-topic \
--property print.key=true
- Create a producer for joins-input-topic-left:
kafka-console-producer \
--broker-list localhost:29092 \
--topic joins-input-topic-left \
--property parse.key=true \
--property key.separator=:
# To add messages
- Create a producer for joins-input-topic-right:
kafka-console-producer \
--broker-list localhost:29092 \
--topic joins-input-topic-right \
--property parse.key=true \
--property key.separator=:
# To add messages
- Create the topic windowing-input-topic:
kafka-topics \
--bootstrap-server localhost:29092 \
--create \
--topic windowing-input-topic \
--replication-factor 3 \
--partitions 3
- Create the topic windowing-output-topic:
kafka-topics \
--bootstrap-server localhost:39092 \
--create \
--topic windowing-output-topic \
--replication-factor 3 \
--partitions 3
- Start WindowingMain Java project:
# streams
./gradlew runWindowing
- Create a consumer for windowing-output-topic:
kafka-console-consumer \
--bootstrap-server localhost:29092 \
--topic windowing-output-topic \
--property print.key=true
- Create a producer for windowing-input-topic:
kafka-console-producer \
--broker-list localhost:29092 \
--topic windowing-input-topic \
--property parse.key=true \
--property key.separator=:
# To add messages
# after 10 secods
- Change the cluster configuration:
kafka-configs \
--bootstrap-server localhost:29092 \
--alter \
--entity-type brokers \
--entity-default \
--add-config retention.ms=259200000
- Change a topic configuration:
kafka-configs \
--bootstrap-server localhost:29092 \
--alter \
--entity-type topics \
--entity-name inventory-purchases \
--add-config unclean.leader.election.enable=true, \
- Create the topic count-topic:
kafka-topics \
--bootstrap-server localhost:29092 \
--create \
--topic count-topic \
--replication-factor 2 \
--partitions 2
- Start ProducerMain Java project:
# producer-and-consumer
./gradlew runProducer
- Create the topic first-topic:
kafka-topics \
--bootstrap-server localhost:29092 \
--create \
--topic first-topic \
--replication-factor 2 \
--partitions 2
- Create the topic second-topic:
kafka-topics \
--bootstrap-server localhost:29092 \
--create \
--topic second-topic \
--replication-factor 2 \
--partitions 2
# producer-and-consumer
./gradlew runConsumer
- Create a producer for first-topic:
kafka-console-producer \
--broker-list localhost:29092 \
--topic first-topic \
--property parse.key=true \
--property key.separator=:
# To add messages
- Create a producer for second-topic:
kafka-console-producer \
--broker-list localhost:29092 \
--topic second-topic \
--property parse.key=true \
--property key.separator=:
# To add messages
- Enable schema-registry and kafka-rest services:
sudo systemctl enable confluent-schema-registry confluent-kafka-rest
- Start schema-registry and kafka-rest services:
sudo systemctl start confluent-schema-registry confluent-kafka-rest
- Request to produce records:
curl \
--request POST \
--header 'Content-Type: application/vnd.kafka.json.v2+json' \
--data-binary @records.json \
- Request body:
// records.json
"records": [
"key": "<KEY>",
"value": "VALUE"
"key": "<KEY>",
"value": "VALUE"
- Create a consumer and consumer-instance:
curl \
--request POST \
--header 'Content-Type: application/vnd.kafka.json.v2+json' \
--data-binary @consumer-instance.json \
- Request body:
// consumer-instance.json
"format": "json",
"auto.offset.reset": "earliest"
- Subscribe consumer instance to a topic:
curl \
--request POST \
--header 'Content-Type: application/vnd.kafka.json.v2+json' \
--data-binary @topic-subscription.json \
- Request body:
// topic-subscription.json
"topics": ["<TOPIC_NAME>"]
- To consume messages:
curl \
--request GET \
--header 'Content-Type: application/vnd.kafka.json.v2+json' \
- To delete a consumer instance:
curl \
--request DELETE \
--header 'Content-Type: application/vnd.kafka.json.v2+json' \
Compatibility types:
- BACKWARD (default)
- BACKWARD_TRANSITIVE (all backward)
- FORWARD (current)
- FORWARD_TRANSITIVE (all forward)
- FULL (current)
Source: Confluent Documentation
Source to Sink connector.
- Start Kafka Connect service:
sudo systemctl start confluent-kafka-connect
- Create input.txt file, with some data:
touch input.txt
- Create output.txt file, with following permissions:
touch output.txt && chmod 777 output.txt
- Create a source connector:
curl \
--request POST \
--header 'Accept: */*' \
--header 'Content-Type: application/json' \
--data-binary @source-connector.json \
- Request body:
// source-connector.json
"name": "file_source_connector",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"topics": "connector-topic",
"file": "/home/user/input.txt",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
- Create a sink connector:
curl \
--request POST \
--header 'Accept: */*' \
--header 'Content-Type: application/json' \
--data-binary @sink-connector.json \
- Request body:
// sink-connector.json
"name": "file_sink_connector",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"topics": "connector-topic",
"file": "/home/user/output.txt",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
- Check the created connector status:
curl --request GET http://localhost:8083/connectors/<CONNECTOR_NAME>/status
- To see the connector metadata:
curl --request GET http://localhost:8083/connectors/<CONNECTOR_NAME>
- To delete a connector:
curl --request DELETE http://localhost:8083/connectors/<CONNECTOR_NAME>
- Create a Certificate Authority (CA):
openssl req -new \
-x509 \
-keyout ca-key \
-out ca-cert \
-days 365 \
-subj "/C=US/ST=Texas/L=Keller/O=Linux Academy/OU=Content/CN=CCDAK"
- Create a client keystore:
keytool \
-keystore client.truststore.jks \
-alias CARoot \
-import \
-file ca-cert
- Create a server keystore:
keytool \
-keystore server.truststore.jks \
-alias CARoot \
-import \
-file ca-cert
- Create a keystore for each broker server:
keytool \
-keystore kafka-[BROKER_ID].truststore.jks \
-alias localhost \
-validity 365 \
-genkey \
-keyalg RSA \
-dname "CN=<PUBLIC_DNS_NAME>, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown" \
-ext san=dns:kafka-[BROKER_ID],dns:localhost,ip:,ip:<PRIVATE_IP_ADDR>
- Generate a certificate for each broker server:
keytool \
-keystore kafka-[BROKER_ID].keystore.jks \
-alias localhost \
-certreq \
-file kafka-[BROKER_ID]-cert-file
- Create a configuration file for certificate signing:
echo subjectAltName = DNS:kafka-[BROKER_ID],DNS:localhost,IP:,IP:<PRIVATE_IP_ADDR> >> kafka-[BROKER_ID]-extfile.conf
- Sign certificates:
openssl x509 \
-req \
-CA ca-cert \
-CAkey ca-key \
-in kafka-[BROKER_ID]-cert-file \
-out kafka-[BROKER_ID]-cert-signed \
-days 365 \
-CAcreateserial \
-extfile kafka-[BROKER_ID]-extfile.conf
- Import CA certificate to each broker keystore:
keytool \
-keystore kafka-[BROKER_ID].keystore.jks \
-alias CARoot \
-import \
-file ca-cert
- Import signed certificates to each broker keystore:
keytool \
-keystore kafka-[BROKER_ID].keystore.jks \
-alias localhost \
-import \
-file kafka-[BROKER_ID]-cert-signed
- Copy keystores to proper servers:
# same server
cp kafka-1.keystore.jks server.truststore.jks /home/<USER>
# remote servers
scp [ kafka-2, kafka-3 ].keystore.jks server.truststore.jks <USER>@[ kafka-2, kafka-3 ]:/home/<USER>
- Move keystores to proper directories:
sudo mkdir --parents /var/private/ssl
sudo mv server.truststore.jks /var/private/ssl
sudo mv client.truststore.jks /var/private/ssl
sudo mv kafka-[BROKER_ID].keystore.jks /var/private/ssl/server.keystore.jks
sudo chown --recursive root:root /var/private/ssl
- Configure each broker configuration:
# /etc/kafka/server.properties
# advertised.listeners=PLAINTEXT://kafka-[BROKER_ID]:9092
- Restart Kafka service:
sudo systemctl restart confluent-kafka
- Create a properties file:
# client-ssl.properties
- Use configuration file (using secure 9093 port):
kafka-console-consumer \
--bootstrap-server kafka-[BROKER_ID]:9093 \
--topic tls-test \
--from-beginning \
--consumer.config client-ssl.properties
- Create a keystore for client certificates:
keytool \
-keystore client.keystore.jks \
-alias kafkauser \
-validity 365 \
-genkey \
-keyalg RSA \
-dname "CN=kafkauser, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown"
- Generate a client certificate:
keytool \
-keystore client.keystore.jks \
-alias kafkauser \
-certreq \
-file client-cert-file
- Sign certificate:
openssl x509 \
-req \
-CA ca-cert \
-CAkey ca-key \
-in client-cert-file \
-out client-cert-signed \
-days 365 \
- Import CA certificate to client keystore:
keytool \
-keystore client.keystore.jks \
-alias CARoot \
-import \
-file ca-cert
- Import signed certificate to client keystore:
keytool \
-keystore client.keystore.jks \
-alias kafkauser \
-import \
-file client-cert-signed
- Move keystore to proper directory:
sudo mv client.keystore.jks /var/private/ssl
sudo chown root:root /var/private/ssl/client.keystore.jks
- Configure each broker configuration:
# /etc/kafka/server.properties
- Restart Kafka service:
sudo systemctl restart confluent-kafka
- Update properties file:
# client-ssl.properties
- Use configuration file (using secure 9093 port):
kafka-console-consumer \
--bootstrap-server kafka-[BROKER_ID]:9093 \
--topic tls-test \
--from-beginning \
--consumer.config client-ssl.properties
An Access Control List consists of:
- Principal - user.
- Allow/Deny
- Operation - action (read, write, all).
- Host - IP address(es).
- Resource Pattern - matches one or more resources.
- Enable ACL in each broker:
# /etc/kafka/server.properties
- Restart Kafka service:
sudo systemctl restart confluent-kafka
- To create an ACL:
kafka-acls \
--authorizer-properties zookeeper.connect=localhost:2181 \
--add \
--allow-principal User:[USER_NAME] \
--operation all \
--topic [TOPIC_NAME]
- To list topic's ACLs:
kafka-acls \
--authorizer-properties zookeeper.connect=localhost:2181 \
--list \
--topic [TOPIC_NAME]
- To remove an ACL from a topic:
kafka-acls \
--authorizer-properties zookeeper.connect=localhost:2181 \
--remove \
--topic [TOPIC_NAME]
- Create a producer with JMX enabled:
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false -Djava.rmi.server.hostname=localhost" \
kafka-console-producer \
--broker-list localhost:29092 \
--topic monitor-test
> hello world
> ...
- Create a consumer with JMX enabled:
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false -Djava.rmi.server.hostname=localhost" \
kafka-console-consumer \
--bootstrap-server localhost:29092 \
--topic monitor-test \
- Start graphical JConsole:
sudo jconsole
- response-rate: average acknowledgements per seconds (global and per broker).
- request-rate: average requests per seconds (global and per broker).
- request-latency-avg: average request latency in miliseconds (only per broker).
- outgoing-byte-rate: bytes per seconds (global and per broker).
- io-wait-time-ns-avg: average time socket ready read/write in nanoseconds (only global).
- records-lag-max: maximum record lag. How far the consumer is behind producers.
- bytes-consumed-rate: rate of bytes consumed per second.
- records-consumed-rate: rate of records consumed per second.
- fetch-rate: fetch requests per second.
acks: Determines when the broker will acknowledge the record.
- 0: Producer will not wait for acknowledgements from ther server.
- 1: Record will be acknowledged when the leader writes the record to the disk.
- all / -1: Record will be acknowledged only when the leader and all replicas have written the record to their disks.
retries: Number of times to retry a record if there is a transient error. (max.in.flight.requests.per.connection to 1, to records not appear in different order).
batch.size: Maximum number of bytes in a batch.
fetch.min.bytes: The minimum amount of data to fetch in a request.
heartbeat.interval. ms: How often to send heartbeats to the consumer coordinator.
auto.offset.reset: What to do when consumer has no initial offset.
- Latest: Start at the latest record (default).
- Earliest: Start at the earliest record.
- none: Throw an exception when there is no existing offset data.
enable.auto.commit: Periodically commit the current offset in the background.
- Configure KSQL Server:
# /etc/ksql/ksql-server.properties
- Enable KSQL service:
sudo systemctl enable confluent-ksql
- Start KSQL service:
sudo systemctl start confluent-ksql
- Create the topic ksql-test:
kafka-topics \
--bootstrap-server localhost:29092 \
--create \
--topic ksql-test \
--partitions 1 \
--replication-factor 1
- Create a producer for ksql-test:
kafka-console-producer \
--broker-list localhost:29092 \
--topic ksql-test \
--property parse.key=true \
--property key.separator=:
# To add messages
- To start KSQL console:
sudo ksql http://localhost:28088
- To change a configuration:
SET 'auto.offset.reset' = 'earliest';
- List topics:
- Show topic's data:
- Create a stream from a topic:
CREATE STREAM ksql_test_stream (employee_id INTEGER, name VARCHAR, vacation_days INTEGER) WITH (kafka_topic='ksql-test', value_format='DELIMITED');
- Create a table from a topic:
CREATE TABLE ksql_test_table (employee_id INTEGER, name VARCHAR, vacation_days INTEGER) WITH (kafka_topic='ksql-test', value_format='DELIMITED', key='employee_id');
- Select from a stream or table:
SELECT * FROM ksql_test_stream EMIT CHANGES;
- Grouping from a stream:
SELECT SUM(vacation_days) FROM ksql_test_stream GROUP BY employee_id EMIT CHANGES;
- Conditional streaming:
CREATE STREAM member_signups
(firstname VARCHAR,
lastname VARCHAR,
email_notifications BOOLEAN)
WITH (KAFKA_TOPIC='member_signups',
CREATE STREAM member_signups_email AS
SELECT * FROM member_signups WHERE email_notifications=true;
- Joining streamings:
CREATE STREAM member_signups
(firstname VARCHAR,
lastname VARCHAR)
WITH (KAFKA_TOPIC='member_signups',
CREATE STREAM member_contact
(email VARCHAR)
WITH (KAFKA_TOPIC='member_contact',
CREATE STREAM member_email_list AS
SELECT member_signups.firstname, member_signups.lastname, member_contact.email
FROM member_signups
INNER JOIN member_contact WITHIN 365 DAYS ON member_signups.rowkey = member_contact.rowkey;