This exercise adds RBAC
for authorization, leveraging on LDAP Authentication. It contains several changes from the previous exercise ext-basic-tls, starting with the inclusion of MDS
service to drive RBAC authorization mechanism.
This exercise will also use the Ingress Controller to access Kafka from the outside of the k8s cluster, this way there's only a single DNS record to maintain.
First a checklist of steps from the previous exercises:
- Create Internal CA and Key
- Provide the CA cert and Key in a secret named
ca-pair-sslcerts
- CRD of the Cluster components will have
tls.autoGeneratedCerts=true
- Bring or create a External Certificates for the components (mind the SAN's)
- IMPORTANT Note the
mds.services.confluent.acme.com
entry to the services external certificates to expose MDS endpoint - It would be good to have two sets of certs, one for Kafka and one for the services
- Refer to the
TLS External
section in the ../ext-tls-basic/README.md - You should have
services-external-tls
andkafka-external-tls
secrets
- IMPORTANT Note the
For each service we will prepare a secret with credentials to connect between each other (when it applies) and to connect to mds, also credentials for mds to authenticate with LDAP.
The secrets needed are summarized in the following table, additional columns have been provided to help setting up the commands to create them.
CAs and certs are not reflected in this list, only MDS Token pair mentioned in the next section is here because they are not actual certificates.
Secret | Secret filename | Purpose | Content |
---|---|---|---|
kafka-interbroker-creds | plain-interbroker.txt | Kafa LDAP user and password | file |
mds-key-pair | mdsPublicKey.pem & mdsTokenKeyPair.pem | Key pair for MDS to generate authorization Tokens | see MDS Token Pair below |
mds-token | mdsPublicKey.pem | public part of MDS key-pair for services to validate the Tokens | see MDS Token Pair below |
mds-ldap-creds | ldap.txt | MDS LDAP Credentials | file |
kafka-mds-creds | bearer.txt | Kafka Credentials LDAP CN and password | file |
sr-mds-creds | bearer.txt | SR Credentials as in LDAP CN and password | file |
connect-mds-creds | bearer.txt | Connect Credentials as in LDAP CN and password | file |
ksql-mds-creds | bearer.txt | KSQL Credentials as in LDAP CN and password | file |
c3-mds-creds | bearer.txt | C3 Credentials as in LDAP CN and password | file |
connect-sr-basic | basic.txt | Connect credentials to log in SR | same as connect-mds-creds |
ksql-sr-basic | basic.txt | KSQL credentials to log in SR | same as ksql-mds-creds |
c3-sr-basic | basic.txt | C3 credentials to log in SR | same as c3-mds-creds |
c3-connect-basic | basic.txt | C3 credentials to log in Connect | same as c3-mds-creds |
c3-ksql-basic | basic.txt | KSQL credentials to log in Connect | same as c3-mds-creds |
Using the above table you can create the needed secrets with the command below
kubectl create secret generic <secret> \
--from-file=<secret_filename>=<file> \
--namespace confluent
For this exercise the secret contents are available in the secrets folder and for "deployment" convenience a secret.yml
has been prepared nesting a similar command like the above. See secrest/README.md
kubectl apply -f secrets/secrets.yml
To follow this exercise you can deploy an LDAP service in another namespace of your cluster with the provided HELM chart in ldap-service folder.
Pre Configured Groups TODO
Pre Configured Users and the Groups they belong to TODO
This is configured as a service in the Kafka CRD.
spec:
...
services:
mds:
tls:
enabled: true
tokenKeyPair:
secretRef: mds-key-pair
provider:
type: ldap
ldap:
address: ldap://ldap.ldap.svc.cluster.local:389
authentication:
type: simple
simple:
secretRef: mds-ldap-creds
configurations:
groupNameAttribute: cn
groupObjectClass: group
groupMemberAttribute: member
groupMemberAttributePattern: CN=(.*),DC=confluent,DC=acme,DC=com
groupSearchBase: dc=confluent,dc=acme,dc=com
userNameAttribute: cn
userMemberOfAttributePattern: CN=(.*),DC=confluent,DC=acme,DC=com
userObjectClass: organizationalRole
userSearchBase: dc=confluent,dc=acme,dc=com
# Additional TLS Configuration available to connect with LDAP
# tls:
...
NOTE: With also need to add KafkaRest as an actual dependency in the Kafka CRD, to enable/configure the broker admin REST api, with credentials to connect with mds.
spec:
...
dependencies:
...
kafkaRest:
authentication:
type: bearer
bearer:
secretRef: kafka-mds-creds
...
MDS needs an (asymetric?) key-pair to create authentication TOKENS, bearer of these tokens will use them to authenticate to the services and authenticity will be validated with the public part of the key-pair. Refer to the secrets section README.md.
mkdir generated
openssl genrsa -out generated/mds-key-priv.pem 2048
openssl rsa -in generated/mds-key-priv.pem -out PEM -pubout -out generated/mds-key-pub.pem
kubectl create secret generic mds-key-pair \
--from-file=mdsPublicKey.pem=generated/mds-key-pub.pem \
--from-file=mdsTokenKeyPair.pem=generated/mds-key-priv.pem \
--namespace confluent
MDS also needs credentiasl to authenticate with LDAP (bind), these must be provided in a secret with a file named ldap.txt
with username/password values (username as full CN)
### ldap.txt content example
username=cn=mds,dc=confluent,dc=acme,dc=com
password=Developer!
KafkaRestClass is like the "admin" interface that CFK will use for the cluster, and it is used to create topics, rolebindings, schemas, etc., using CRDs, additionally when deploying componentes it will be used to setup some rolebindings too, thus it is important to add it to your deployment file (or just apply it before other components but after Kafka).
apiVersion: platform.confluent.io/v1beta1
kind: KafkaRestClass
metadata:
name: default
namespace: confluent
labels:
component: restclass
spec:
kafkaClusterRef:
name: kafka
namespace: confluent
kafkaRest:
authentication:
type: bearer
bearer:
secretRef: kafka-mds-creds
In this exercise Kafka external access will be provided via an Ingress Controller that will route the request to each broker depending on their "hostname". To achieve this we need to setup the external listener (or a custom one) with externalAccess
of type [staticForHostBasedRouting
]
Extract of the Kafka CRD...
spec:
...
listeners:
...
external:
authentication:
type: ldap
tls:
enabled: true
secretRef: kafka-external-tls
externalAccess:
type: staticForHostBasedRouting
staticForHostBasedRouting:
domain: kafka.confluent.acme.com
brokerPrefix: broker
port: 443
...
Internal clients and components can keep authenticating using SASL/PLAIN
.
Components can switch to an oauthbearer
mechanism to broker a token from MDS, in any case they will need to add MDS as a dependency.
As an example, the metricReporter
component in Kafka CRD
spec:
...
metricReporter:
enabled: false
bootstrapEndpoint: kafka:9071
authentication:
type: oauthbearer
oauthbearer:
secretRef: kafka-mds-creds
tls:
enabled: true
...
Example of component SchemaRegistry CRD authentication with Kafka and MDS Dependency. NOTE: Components connecting to MDS only need the public part of the key-pair
spec:
...
dependencies:
kafka:
bootstrapEndpoint: kafka:9071
authentication:
type: oauthbearer
oauthbearer:
secretRef: sr-mds-creds
tls:
enabled: true
mds:
endpoint: https://kafka.confluent.svc.cluster.local:8090
tokenKeyPair:
secretRef: mds-token
authentication:
type: bearer
bearer:
secretRef: sr-mds-creds
tls:
enabled: true
Components CRDs, other than Kafka, won't use spec.authentication
BASIC, instead they have spec.authorization.type=rbac
that enables a "Token" based authentication, they will now validate tokens from clients with the public part of the MDS key-pair, thus the need for spec.dependencies.mds
too.
Example in SchemaRegistry CRD, in this exercise mds
is run as a service in the brokers.
spec:
...
authorization:
type: rbac
...
dependencies:
...
mds:
endpoint: https://kafka.confluent.svc.cluster.local:8090
tokenKeyPair:
secretRef: mds-token
authentication:
type: bearer
bearer:
secretRef: sr-mds-creds
tls:
enabled: true
...
Clients will still submit "BASIC" username/password credentials to the HTTP-based components, the component will broker a Token with MDS for them. See the example below,
Example of KSQL dependency with SR using "BASIC" to authenticate
spec:
...
dependencies:
...
schemaRegistry:
url: https://schemaregistry.confluent.svc.cluster.local:8081
authentication:
type: basic
basic:
secretRef: ksql-sr-basic
tls:
enabled: true
...
Rolebindigs for each component should be added automatically by CFK the first time they are deployed, provided the KafkaRestClass
is also deployed, otherwise you would see some ERROR log in the operator pod.
You can check the rolebindings created using
kubectl get confluentrolebinding
For additional rolebindings you can use a custom CRD like in the following example, that add missing rolebindgs as per our cluster setup, connect user should have access to SR
TODO
External access to the Cluster components will be handled by and Ingress controller, like in the previous exercise, but this time we will also provide access to Kafa thorough the same Ingress, but with another set of rules to leverage of a different set of certificates for Kafka.
NGINX Ingress Controller for Kubernetes can be installed using HELM, it must be installed in the same namespace of the services to be exposed.
helm repo add ingress-nginx https://kubernetes.github.io/ingress-nginx
helm repo update
NOTE: The controller configuration is different from the previous exercises in that we are activating ssl-passthrough
, for the Kafka Ingress, you may need to unistall the controller if it remains from previous exercises using helm unistall ingress-nginx
.
helm upgrade --install ingress-nginx ingress-nginx/ingress-nginx \
--set controller.extraArgs.enable-ssl-passthrough="true" \
--namespace confluent
NOTE: Make sure that the chart version is at least 4.2.1
and App version 1.3.0
(using helm ls
command)
Each service with multiple instances would need to be bootstraped for the ingress to redirect to any of them. CFK already creates ClusterIP
"bootstrap" services for each component with appropiate selectors.
kubectl describe svc schemaregistry
kubectl describe svc connect
kubectl describe svc ksqldb
Kafka bootstrap is not deployed with staticForHostBasedRouting
but we can create a boostrap service and expose it in the Ingress. See cp-kafka-bootstrap.yaml
Doing the same for MDS (port 8090 of the brokers) will allow us to expose it via the services Ingress (needs reencryption) and connect with Confluent CLI, see cp-mds-bootstrap.yaml
Ingress example file cp-ingress-services.yaml define rules for each service and "dns" based "routing", at the same time, the tls
definition indicates that TLS should be terminated for the listed domains and "re-encrypted" using the certificate indicated in the spec.tls.secretName
.
Ingress cp-ingress-kafka.yaml defines the rules to access Kafka, but the Kafka external listener must be configured with an externalAccess
of type [staticForHostBasedRouting
], this Ingres doesn't terminate SSL is an ssl passthrough since the external certificates are already configures in the Kafka listener spec.listeners.external.tls.secretRef
.
Important annotations for the Kafka Ingress are...
metadata:
annotations:
nginx.ingress.kubernetes.io/ssl-passthrough: "true"
ingress.kubernetes.io/ssl-passthrough: "true"
We have added labels to each CRD in order to allow deployment and re-deployment by component. The following is the recommended deploy order, after applying the secrets, including the required certificates, etc.
# Cluster Components
kubectl apply -f cp-platform.yaml -l component=zk
kubectl apply -f cp-platform.yaml -l component=kafka
kubectl apply -f cp-platform.yaml -l component=restclass
kubectl apply -f cp-platform.yaml -l component=sr
kubectl apply -f cp-platform.yaml -l component=connect
kubectl apply -f cp-platform.yaml -l component=ksqldb
kubectl apply -f cp-platform.yaml -l component=c3
# Bootstraps
kubectl apply -f cp-kafka-bootstrap.yaml
kubectl apply -f cp-mds-bootstrap.yaml
# Ingress
kubectl apply -f cp-ingress-services.yaml
kubectl apply -f cp-ingress-kafka.yaml
Once the platform is deployed, only the ingress controller would be exposing a Public IP (to access outside k8s), you should update your DNS records for all the clusters services to that IP, assuming you do this locally via etc/hosts
file...
# fetch the IP of the Ingress controller...
INGRESS_IP=$(kubectl get svc ingress-nginx-controller -o jsonpath='{.status.loadBalancer.ingress[*].ip}')
An example of how the entries in your etc/hosts
could look like
<INGRESS_IP> schemaregistry.services.confluent.acme.com
<INGRESS_IP> connect.services.confluent.acme.com
<INGRESS_IP> ksqldb.services.confluent.acme.com
<INGRESS_IP> controlcenter.services.confluent.acme.com
<INGRESS_IP> mds.services.confluent.acme.com
<INGRESS_IP> bootstrap.kafka.confluent.acme.com
<INGRESS_IP> broker0.kafka.confluent.acme.com
<INGRESS_IP> broker1.kafka.confluent.acme.com
<INGRESS_IP> broker2.kafka.confluent.acme.com
Find external IP of any of the broker or the bootstrap, and cCheck the exposed certificates with openssl
## CHECK EXTERNAL CERTIFICATE
# Since the ingress works as a reverse proxy, we need an additional argument with the hostname used in the ingress rules... we can connect to the ingress IP instead of the hostname
CLUSTER_IP=$(kubectl get svc ingress-nginx-controller -o jsonpath='{.status.loadBalancer.ingress[*].ip}')
openssl s_client -connect $CLUSTER_IP:443 \
-servername bootstrap.kafka.confluent.acme.com </dev/null 2>/dev/null | \
openssl x509 -noout -text | \
grep -E '(Issuer: | DNS:)'
openssl s_client -connect $CLUSTER_IP:443 \
-servername broker0.kafka.confluent.acme.com </dev/null 2>/dev/null | \
openssl x509 -noout -text | \
grep -E '(Issuer: | DNS:)'
openssl s_client -connect $CLUSTER_IP:443 \
-servername broker1.kafka.confluent.acme.com </dev/null 2>/dev/null | \
openssl x509 -noout -text | \
grep -E '(Issuer: | DNS:)'
openssl s_client -connect $CLUSTER_IP:443 \
-servername broker2.kafka.confluent.acme.com </dev/null 2>/dev/null | \
openssl x509 -noout -text | \
grep -E '(Issuer: | DNS:)'
To consumer and produce to Kafka from outside the K8s cluster, using the external listener, we can use kafka-topics
, kafka-producer-perf-test
and kafka-console-consumer
from the command line.
We first need to setup the properties file to connect and the truststore
# Import the CA pem file into a jks for trustore
keytool -noprompt -import -alias ca \
-keystore ../tlscerts/generated/truststore.jks \
-deststorepass changeme \
-file ../tlscerts/generated/ExternalCAcert.pem
# Prepare the properties file with the newly created JKS (absolute path)
sed "s|TRUSTORE_LOCATION|$(ls $PWD/../tlscerts/generated/truststore.jks)|g" external-client.properties.tmpl > external-client.properties
NOTE: For these test the property file used is loging as kafka
superuser, to skip creating a rolebind. Also note the port exposed by the ingress (443).
## CREATE A TOPIC
kafka-topics --bootstrap-server bootstrap.kafka.confluent.acme.com:443 \
--command-config external-client.properties \
--create \
--topic app-topic \
--replication-factor 3 \
--partitions 1
Generate data in one terminal window
kafka-producer-perf-test \
--topic app-topic \
--record-size 64 \
--throughput 5 \
--producer-props bootstrap.servers=bootstrap.kafka.confluent.acme.com:443 \
--producer.config external-client.properties \
--num-records 1000
Consume data in another terminal window
kafka-console-consumer \
--bootstrap-server bootstrap.kafka.confluent.acme.com:443 \
--consumer.config external-client.properties \
--property print.partition=true \
--property print.offset=true \
--topic app-topic \
--from-beginning \
--timeout-ms 30000
## CHECK EXTERNAL CERTIFICATE
# Since the ingress works as a reverse proxy, we need an additional argument with the hostname used in the ingress rules... we can connect to the ingress IP instead of the hostname
CLUSTER_IP=$(kubectl get svc ingress-nginx-controller -o jsonpath='{.status.loadBalancer.ingress[*].ip}')
openssl s_client -connect $CLUSTER_IP:443 \
-servername mds.services.confluent.acme.com </dev/null 2>/dev/null | \
openssl x509 -noout -text | \
grep -E '(Issuer: | DNS:)'
This is a simple check to confirm the REST endpoint works
## CHECK EXTERNAL CERTIFICATE
# Since the ingress works as a reverse proxy, we need an additional argument with the hostname used in the ingress rules... we can connect to the ingress IP instead of the hostname
CLUSTER_IP=$(kubectl get svc ingress-nginx-controller -o jsonpath='{.status.loadBalancer.ingress[*].ip}')
openssl s_client -connect $CLUSTER_IP:443 \
-servername schemaregistry.services.confluent.acme.com </dev/null 2>/dev/null | \
openssl x509 -noout -text | \
grep -E '(Issuer: | DNS:)'
## CHECK THE SERVICE - NO USER
# use -k if you have not added the CA to the trusted chain of yout host
curl -k https://schemaregistry.services.confluent.acme.com/schemas/types
## CHECK THE SERVICE - AUTHENTICATING
# use -k if you have not added the CA to the trusted chain of yout host
curl -k -u sr:sr-secret https://schemaregistry.services.confluent.acme.com/schemas/types
This is a simple check to confirm the REST endpoint works
## CHECK EXTERNAL CERTIFICATE
# Since the ingress works as a reverse proxy, we need an additional argument with the hostname used in the ingress rules... we can connect to the ingress IP instead of the hostname
CLUSTER_IP=$(kubectl get svc ingress-nginx-controller -o jsonpath='{.status.loadBalancer.ingress[*].ip}')
openssl s_client -connect $CLUSTER_IP:443 \
-servername connect.services.confluent.acme.com </dev/null 2>/dev/null | \
openssl x509 -noout -text | \
grep -E '(Issuer: | DNS:)'
## CHECK THE SERVICE - NO USER
# use -k if you have not added the CA to the trusted chain of yout host
curl -k https://connect.services.confluent.acme.com
## CHECK THE SERVICE - AUTHENTICATING
# use -k if you have not added the CA to the trusted chain of yout host
curl -k -u connect:connect-secret https://connect.services.confluent.acme.com
This is a simple check to confirm the REST endpoint works
## CHECK EXTERNAL CERTIFICATE
# Since the ingress works as a reverse proxy, we need an additional argument with the hostname used in the ingress rules... we can connect to the ingress IP instead of the hostname
CLUSTER_IP=$(kubectl get svc ingress-nginx-controller -o jsonpath='{.status.loadBalancer.ingress[*].ip}')
openssl s_client -connect $CLUSTER_IP:443 \
-servername ksqldb.services.confluent.acme.com </dev/null 2>/dev/null | \
openssl x509 -noout -text | \
grep -E '(Issuer: | DNS:)'
## CHECK THE SERVICE - NO USER - DOES NOT GIVE ERROR JUST AN EMPTY RESPONSE
# use -k if you have not added the CA to the trusted chain of yout host
curl -k https://ksqldb.services.confluent.acme.com/info
## CHECK THE SERVICE - AUTHENTICATING
# use -k if you have not added the CA to the trusted chain of yout host
curl -k -u ksql:ksql-secret https://ksqldb.services.confluent.acme.com/info
Just navigate to https://controlcenter.services.confluent.acme.com/, you will be challenged with a logging, use c3
/ c3-secret
NOTE: *When using a self-signed certificates, your browser will display a NET::ERR_CERT_AUTHORITY_INVALID
error message, dependening on the browser there are mechanisms to override and accept the risk of insecure browsing and proceed to C3 page, optionally, you can import the CA cert in your SO/browser certificate trust chain, and restart the browser.
C3 used might not have all the privileges to view all the services, you must create the Rolebindigs or log with a user with more privileges
NOTE: Apply the Rolebindings in cp-testadmin-rolebinding.yaml to give SystemAdmin to all the components to user testadmin
password: testadmin
Use the following queries to test Schema Registry and ksqldb from within C3 using testadmin
user.
CREATE STREAM users (id INTEGER KEY, gender STRING, name STRING, age INTEGER) WITH (kafka_topic='users', partitions=1, value_format='AVRO');
INSERT INTO users (id, gender, name, age) VALUES (0, 'female', 'sarah', 42);
INSERT INTO users (id, gender, name, age) VALUES (1, 'male', 'john', 28);
INSERT INTO users (id, gender, name, age) VALUES (42, 'female', 'jessica', 70);
NOTE: Since we disabled athenticate for ksqldb, the push query should be work properly, as the socket would be created via the ingress (assung advertisedUrl
for ksql dependency was set in controlcenter CRD)
-- Make sure to set auto.offset.reset=earliest
SELECT id, gender, name, age FROM users WHERE age<65 EMIT CHANGES;
-- You should get 2 records
With MDS exposed you can login using:
confluent login --url https://mds.services.confluent.acme.com:443
kubectl delete -f cp-platform.yaml
kubectl delete -f cp-kafka-bootstrap.yaml
kubectl delete -f cp-mds-bootstrap.yaml
kubectl delete -f cp-ingress-services.yaml
kubectl delete -f cp-ingress-kafka.yaml
kubectl delete -f secrets/secrets.yaml
https://docs.confluent.io/operator/current/co-rbac.html
https://github.com/kubernetes/ingress-nginx/tree/main/charts/ingress-nginx
https://docs.confluent.io/operator/current/co-rbac.html#troubleshooting-verify-mds-configuration