This exercise add SASL/PLAIN
authentication to kafka and BASIC
for the HTTP-based services, to the tls-noauth excercise.
For Kafka Resoruce you will notice the following addition that indicates the secret containing a json file with the allowed users. Also note that authentication is enforced at the "listener".
spec:
...
listeners:
internal:
authentication:
type: plain
jaasConfig:
secretRef: kafka-internal-users-jaas
tls:
enabled: true
For other resources, a similar approach is followed, but authentication applies to the whole service.
spec:
...
authentication:
type: basic
basic:
secretRef: basic-users-sr
For the filenames inside the secrets and their format, refer to the CFK Authentication Documentation.
NOTE: For simplicity we will use mtls
for zookeeper<>kafka authentication.
We will use the same approach as in tls-noauth excercise, we will create a CA signer certificate and key, and let CFK create the certificates for each service.
mkdir generated
openssl genrsa -out generated/InternalCAkey.pem 2048
openssl req -x509 -new -nodes \
-key generated/InternalCAkey.pem \
-days 365 \
-out generated/InternalCAcert.pem \
-subj "/C=ES/ST=VLC/L=VLC/O=Demo/OU=GCP/CN=InternalCA"
kubectl create secret tls ca-pair-sslcerts \
--cert=generated/InternalCAcert.pem \
--key=generated/InternalCAkey.pem \
--namespace confluent
For each service we will prepare a secret with a list of allowed users, including "service" user, also secrets with the login/password that each service will use to authenticate among them.
The secrets need are summarized in the following table, additional columns have been provided to help setting up the commands to create them
Secret | Secret filename | Purpose | Content |
---|---|---|---|
kafka-internal-users | plain-users.json | Allowed users to Kafka | file |
basic-mr-kafka-secret | plain.txt | user/pass to Kafka for metrics | file |
basic-users-sr | basic.txt | Allowed users to SR | file |
basic-users-connect | basic.txt | Allowed users to Connect | file |
basic-users-ksqldb | basic.txt | Allowed users to ksqldb | file |
basic-users-c3 | basic.txt | Allowed users to C3 | file |
basic-sr-kafka-secret | plain.txt | user-pass to Kafka for SR | file |
basic-connect-kafka-secret | plain.txt | user-pass to Kafka for Connect | file |
basic-ksqldb-kafka-secret | plain.txt | user-pass to Kafka for ksqldb | file |
basic-ksqldb-sr-secret | plain.txt | user-pass to SR for ksqldb | file |
basic-c3-kafka-secret | plain.txt | user-pass to Kafka for C3 | file |
basic-c3-sr-secret | plain.txt | user-pass to SR for C3 | file |
basic-c3-connect-secret | plain.txt | user-pass to Connect for C3 | file |
basic-c3-ksqldb-secret | plain.txt | user-pass to ksqldb for C3 | file |
Using the above table you can create the needed secrets with the command below
kubectl create secret generic <secret> \
--from-file=<secret_filename>=<file> \
--namespace confluent
For this exercise the secret contents are available in the secrets folder and for "deployment" convenience a secret.yml
has been prepared nesting a similar command like the above. See secrest/README.md
kubectl apply -f secrets/secrets.yml
To deploy the platform run:
kubectl apply -f cp-platform.yaml
For each service you can perform the following tests...
Descriptor file test-kafka-app.yaml
creates a topic named app-topic
, a secret named kafka-client-tls-properties
with property file to connect internally (to K8s) to kafka (see internal-client.properties), and two pods one that will produce 10.000 records in 1 second interval and another pod that will consume from the same topic.
kubectl apply -f test-kafka-app.yaml
You can follow the logs of each pod once running to see the producer and consumer apps in action
kubectl logs -f producer-app
kubectl logs -f consumer-app
To tear down the app run the code below, but you can leave it running until you test Confluent Control Center (C3) below
kubectl delete -f test-kafka-app.yaml
This is a simple check to confirm the REST endpoint works
## CHECK GENERATED CERTIFICATE
kubectl exec -it kafka-0 -- \
openssl s_client -connect schemaregistry.confluent.svc.cluster.local:8081 </dev/null 2>/dev/null | \
openssl x509 -noout -text | \
grep -E '(Issuer: | DNS:)'
## CHECK THE SERVICE - NO USER
kubectl exec -it kafka-0 -- curl -k https://schemaregistry.confluent.svc.cluster.local:8081/schemas/types
## CHECK THE SERVICE - AUTHENTICATING
kubectl exec -it kafka-0 -- curl -k -u sr-user:sr-password https://schemaregistry.confluent.svc.cluster.local:8081/schemas/types
This is a simple check to confirm the REST endpoint works
## CHECK GENERATED CERTIFICATE
kubectl exec -it kafka-0 -- \
openssl s_client -connect connect.confluent.svc.cluster.local:8083 </dev/null 2>/dev/null | \
openssl x509 -noout -text | \
grep -E '(Issuer: | DNS:)'
## CHECK THE SERVICE - NO USER
kubectl exec -it kafka-0 -- curl -k https://connect.confluent.svc.cluster.local:8083/
## CHECK THE SERVICE - AUTHENTICATING
kubectl exec -it kafka-0 -- curl -k -u connect-user:connect-password https://connect.confluent.svc.cluster.local:8083/
This is a simple check to confirm the REST endpoint works
## CHECK GENERATED CERTIFICATE
kubectl exec -it kafka-0 -- \
openssl s_client -connect ksqldb.confluent.svc.cluster.local:8088 </dev/null 2>/dev/null | \
openssl x509 -noout -text | \
grep -E '(Issuer: | DNS:)'
## CHECK THE SERVICE - NO USER
kubectl exec -it kafka-0 -- curl -k https://ksqldb.confluent.svc.cluster.local:8088/info
## CHECK THE SERVICE - NO AUTHENTICATING
kubectl exec -it kafka-0 -- curl -k -u ksqldb-user:ksqldb-password https://ksqldb.confluent.svc.cluster.local:8088/info
You need to port-forward port 9021 from C3 pod. Either using the confluent kubectl plugin.
## This will also open your browser automatically
kubectl confluent dashboard controlcenter
Or with kubectl port-foward
kubectl port-forward controlcenter-0 9021:9021 -n confluent
## Open your browser and navigate to https://localhost:9021
NOTE: *When using a self-signed certificates, your browser will display a NET::ERR_CERT_AUTHORITY_INVALID
error message, dependening on the browser there are mechanisms to override and accept the risk of insecure browsing and proceed to C3 page, optionally, you can import the CA cert in your SO/browser certificate trust chain, and restart the browser.
If you have not tear down the producer application, you should see the topic and its content.
IMPORTANT: Now you will be prompted to input the user credentials to login into C3, use admin-user
/ admin-password
or any valid used in the basic-users-c3
secret created before.
Use the following queries to test Schema Registry and ksqldb from within C3
CREATE STREAM users (id INTEGER KEY, gender STRING, name STRING, age INTEGER) WITH (kafka_topic='users', partitions=1, value_format='AVRO');
INSERT INTO users (id, gender, name, age) VALUES (0, 'female', 'sarah', 42);
INSERT INTO users (id, gender, name, age) VALUES (1, 'male', 'john', 28);
INSERT INTO users (id, gender, name, age) VALUES (42, 'female', 'jessica', 70);
NOTE: Push Queries will seem "hanged" and will not work since push queries need to open a web socket connection from ksqldb from your browser, and ksqldb is not reachable from the outside in this exercise, the query remains just for completness.
But you can still check the users
topic content and it's assigned schema in the Topics
sections on the left side of C3 UI.
-- Make sure to set auto.offset.reset=earliest
SELECT id, gender, name, age FROM users WHERE age<65 EMIT CHANGES;
-- You should get 2 records
kubectl delete -f cp-platform.yaml
kubectl delete -f secrets/secrets.yaml