Explore Apache Kafka data pipelines in Kubernetes.
Note We are participating in the annual Hacktoberfest. If you're looking to contribute, please see our open issues and use the standalone installation for development.
- Visualization of streaming applications, topics, and connectors
- Monitor all or individual pipelines from multiple namespaces
- Inspection of Avro schema from schema registry
- Integration with streams-bootstrap and faust-bootstrap, or custom streaming app config parsing from Kubernetes deployments using plugins
- Real-time metrics from Prometheus (consumer lag & read rate, replicas, topic size, messages in & out per second, connector tasks)
- Linking to external services for logging and analysis, such as Kibana, Grafana, Loki, AKHQ, Redpanda Console, and Elasticsearch
- Customizable through Python plugins
Visit our introduction blogpost for a complete overview and demo of Streams Explorer.
Prerequisites Access to a Kubernetes cluster, where streaming apps and services are deployed.
- Forward the ports to Prometheus. (Kafka Connect, Schema Registry, and other integrations are optional)
- Start the container
docker compose up
Once the container is started visit http://localhost:8080
- Add the Helm chart repository
helm repo add streams-explorer https://bakdata.github.io/streams-explorer
- Install
helm upgrade --install --values helm-chart/values.yaml streams-explorer streams-explorer/streams-explorer
- Install dependencies using Poetry
poetry install
- Forward the ports to Prometheus. (Kafka Connect, Schema Registry, and other integrations are optional)
- Configure the backend in settings.yaml.
- Start the backend server
poetry run start
- Install dependencies
npm ci
- Start the frontend server
npm run build && npm run prod
Visit http://localhost:3000
Depending on your type of installation set the configuration for the backend server in this file:
- Docker Compose: docker-compose.yaml
- Kubernetes: helm-chart/values.yaml
- standalone: backend/settings.yaml
In the helm-chart/values.yaml configuration is done either through the config
section using double underscore notation, e.g. K8S__consumer_group_annotation: consumerGroup
or the content of backend/settings.yaml can be pasted under the settings
section. Alternatively all configuration options can be written as environment variables using double underscore notation and the prefix SE
, e.g. SE_K8S__deployment__cluster=false
.
The following configuration options are available:
graph.update_interval
Render the graph every x seconds (int, required, default:30
)graph.layout_arguments
Arguments passed to graphviz layout (string, required, default:-Grankdir=LR -Gnodesep=0.8 -Gpad=10
)graph.pipeline_distance
Increase/decrease vertical space between pipeline graphs by X pixels (int, required, default:500
)graph.resolve.input_pattern_topics.all
If true topics that match (extra) input pattern(s) are connected to the streaming app in the graph containing all pipelines (bool, required, default:false
)graph.resolve.input_pattern_topics.pipelines
If true topics that match (extra) input pattern(s) are connected to the streaming app in pipeline graphs (bool, required, default:false
)
kafka.enable
Enable Kafka (bool, default:false
)kafka.config
librdkafka configuration properties (reference) (dict, default:{"bootstrap.servers": "localhost:9092"}
)kafka.displayed_information
Configuration options of Kafka topics displayed in the frontend (list of dict)kafka.topic_names_cache.ttl
Cache for retrieving all topic names (used when input topic patterns are resolved) (int, default:3600
)
kafkaconnect.url
URL of Kafka Connect server (string, default: None)kafkaconnect.update_interval
Fetch connectors every x seconds (int, default:300
)kafkaconnect.displayed_information
Configuration options of Kafka connectors displayed in the frontend (list of dict)
k8s.deployment.cluster
Whether streams-explorer is deployed to Kubernetes cluster (bool, required, default:false
)k8s.deployment.context
Name of cluster (string, optional if running in cluster, default:kubernetes-cluster
)k8s.deployment.namespaces
Kubernetes namespaces (list of string, required, default:['kubernetes-namespace']
)k8s.containers.ignore
Name of containers that should be ignored/hidden (list of string, default:['prometheus-jmx-exporter']
)k8s.displayed_information
Details of pod that should be displayed (list of dict, default:[{'name': 'Labels', 'key': 'metadata.labels'}]
)k8s.labels
Labels used to set attributes of nodes (list of string, required, default:['pipeline']
)k8s.pipeline.label
Attribute of nodes the pipeline name should be extracted from (string, required, default:pipeline
)k8s.consumer_group_annotation
Annotation the consumer group name should be extracted from (string, required, default:consumerGroup
)
schemaregistry.url
URL of Confluent Schema Registry or Karapace (string, default: None)
prometheus.url
URL of Prometheus (string, required, default:http://localhost:9090
)
The following exporters are required to collect Kafka metrics for Prometheus:
akhq.enable
Enable AKHQ (bool, default:false
)akhq.url
URL of AKHQ (string, default:http://localhost:8080
)akhq.cluster
Name of cluster (string, default:kubernetes-cluster
)akhq.connect
Name of connect (string, default: None)
Redpanda Console can be used instead of AKHQ. (mutually exclusive)
redpanda_console.enable
Enable Redpanda Console (bool, default:false
)redpanda_console.url
URL of Redpanda Console (string, default:http://localhost:8080
)
grafana.enable
Enable Grafana (bool, default:false
)grafana.url
URL of Grafana (string, default:http://localhost:3000
)grafana.dashboards.topics
Path to topics dashboard (string), sample dashboards for topics and consumer groups are included in the./grafana
subfoldergrafana.dashboards.consumergroups
Path to consumer groups dashboard (string)
kibanalogs.enable
Enable Kibana logs (bool, default:false
)kibanalogs.url
URL of Kibana logs (string, default:http://localhost:5601
)
Loki can be used instead of Kibana. (mutually exclusive)
loki.enable
Enable Loki logs (bool, default:false
)loki.url
URL of Loki logs (string, default:http://localhost:3000
)
for Kafka Connect Elasticsearch connector
esindex.url
URL of Elasticsearch index (string, default:http://localhost:5601/app/kibana#/dev_tools/console
)
plugins.path
Path to folder containing plugins relative to backend (string, required, default:./plugins
)plugins.extractors.default
Whether to load default extractors (bool, required, default:true
)
ATM Fraud detection with streams-bootstrap
It is possible to create your own config parser, linker, metric provider, and extractors in Python by implementing the K8sConfigParser
, LinkingService
, MetricProvider
, or Extractor
classes. This way you can customize it to your specific setup and services. As an example we provide the DefaultLinker
as LinkingService
. The default MetricProvider
supports Prometheus. Furthermore the following default Extractor
plugins are included:
If your streaming application deployments are configured through environment variables, following the schema of streams-bootstrap or faust-bootstrap, the Streams Explorer works out-of-the-box with the default deployment parser.
For streams-bootstrap deployments configured through CLI arguments a separate parser can be loaded by creating a Python file (e.g. config_parser.py
) in the plugins folder with the following import statement:
from streams_explorer.core.k8s_config_parser import StreamsBootstrapArgsParser
For other setups a custom config parser plugin can be created by inheriting from the K8sConfigParser
class and implementing the parse
method. In this example we're retrieving the streaming app configurations from an external REST API. In order for a deployment to be indentified as streaming app, input and output topics are required.
import httpx
from streams_explorer.core.k8s_config_parser import K8sConfigParser
from streams_explorer.models.k8s import K8sConfig
class CustomConfigParser(K8sConfigParser):
def get_name(self) -> str:
name = self.k8s_app.metadata.name
if not name:
raise TypeError(f"Name is required for {self.k8s_app.class_name}")
return name
def parse(self) -> K8sConfig:
"""Retrieve app config from REST endpoint."""
name = self.get_name()
data = httpx.get(f"url/config/{name}").json()
return K8sConfig(**data)