From 739b9ceee2eacc7d14717810e0824fae53e1ec02 Mon Sep 17 00:00:00 2001 From: George Sun Date: Sun, 8 Dec 2024 17:24:16 +0800 Subject: [PATCH] Topic enforcer to generate Strimzi KafkaTopic resources. --- kafka_topic_enforcer/README.md | 16 +++ .../java/com/tesla/data/topic/enforcer/BUILD | 5 + .../com/tesla/data/topic/enforcer/Main.java | 1 + .../data/topic/enforcer/StrimziCommand.java | 97 +++++++++++++++++++ 4 files changed, 119 insertions(+) create mode 100644 kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/StrimziCommand.java diff --git a/kafka_topic_enforcer/README.md b/kafka_topic_enforcer/README.md index f09a211..4769efc 100644 --- a/kafka_topic_enforcer/README.md +++ b/kafka_topic_enforcer/README.md @@ -9,6 +9,8 @@ Kafka topic enforcer's goal is to automate Kafka topic management & hence remove * Self service, removed dependency on a human * Simple configuration +If you choose to use Strimzi Kafka operator, this command can also generate Strimzi KafkaTopic CRDs. + ## Dependencies * JVM @@ -51,6 +53,9 @@ Usage:
[options] [command] [command options] enforce Enforce given configuration Usage: enforce [options] path to a configuration file Options: + --cluster + a cluster name, if specified, consolidated (multi-cluster) + configuration file is expected --continuous, -c run enforcement continuously Default: false @@ -63,6 +68,17 @@ Usage:
[options] [command] [command options] --unsafemode run in unsafe mode, topic deletion is _only_ allowed in this mode Default: false + strimzi Generate the Strimzi KafkaTopic resources YAML on stdout from + the topic config. Certain information such as topic tags and + config comments would be missed, and the resource + metadata names are also converted to conform RFC 1123. + Usage: strimzi [options] /path/to/a/configuration/file + Options: + --cluster + a cluster name, if specified, consolidated (multi-cluster) + configuration file is expected + * --kafka_cluster, -k + the name of the Strimzi Kafka cluster to be generated for ``` ## Configuration diff --git a/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/BUILD b/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/BUILD index 550f718..539e1f2 100644 --- a/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/BUILD +++ b/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/BUILD @@ -10,8 +10,13 @@ java_library( "//3rdparty/jvm/com/fasterxml/jackson/core:jackson_databind", "//3rdparty/jvm/com/fasterxml/jackson/dataformat:jackson_dataformat_yaml", "//3rdparty/jvm/com/fasterxml/jackson/module:jackson_module_parameter_names", + "//3rdparty/jvm/com/google/guava", + "//3rdparty/jvm/io/fabric8:kubernetes_client_api", + "//3rdparty/jvm/io/fabric8:kubernetes_model_common", + "//3rdparty/jvm/io/fabric8:kubernetes_model_core", "//3rdparty/jvm/io/prometheus:simpleclient", "//3rdparty/jvm/io/prometheus:simpleclient_httpserver", + "//3rdparty/jvm/io/strimzi:api", "//3rdparty/jvm/org/apache/kafka:kafka_clients", "//3rdparty/jvm/org/slf4j:slf4j_api", "//3rdparty/jvm_shaded/com/google/guava_shaded", diff --git a/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/Main.java b/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/Main.java index 8dde211..6173a6e 100644 --- a/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/Main.java +++ b/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/Main.java @@ -48,6 +48,7 @@ public static void main(String[] args) { final Main main = new Main(); final Map> commands = new HashMap<>(); commands.put("validate", new BaseCommand<>()); + commands.put("strimzi", new StrimziCommand()); commands.put("dump", new DumpCommand()); commands.put("enforce", new TopicEnforceCommand()); diff --git a/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/StrimziCommand.java b/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/StrimziCommand.java new file mode 100644 index 0000000..1fffd39 --- /dev/null +++ b/kafka_topic_enforcer/src/main/java/com/tesla/data/topic/enforcer/StrimziCommand.java @@ -0,0 +1,97 @@ +/* + * Copyright (C) 2024 Tesla Motors, Inc. All rights reserved. + */ + +package com.tesla.data.topic.enforcer; + +import com.tesla.data.enforcer.BaseCommand; + +import com.beust.jcommander.Parameter; +import com.beust.jcommander.Parameters; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; +import com.google.common.base.CaseFormat; +import com.google.common.hash.Hashing; +import io.strimzi.api.kafka.model.topic.KafkaTopic; +import io.strimzi.api.kafka.model.topic.KafkaTopicBuilder; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Parameters(commandDescription = "Generate the Strimzi KafkaTopic resources YAML " + + "(https://strimzi.io/docs/operators/latest/configuring.html#type-KafkaTopic-reference) " + + "on stdout from the topic config. Certain information such as topic tags and config comments would be missed, " + + "and the resource metadata names are also converted to conform RFC 1123.") +public class StrimziCommand extends BaseCommand { + + @Parameter( + names = {"--kafka_cluster", "-k"}, + description = "the name of the Strimzi Kafka cluster to be generated for", + required = true) + protected String kafkaCluster; + + @Override + public int run() { + List configuredTopics = configuredEntities(ConfiguredTopic.class, "topics", "topicsFile"); + + ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory() + .configure(YAMLGenerator.Feature.WRITE_DOC_START_MARKER, true) + .configure(YAMLGenerator.Feature.LITERAL_BLOCK_STYLE, true) + .configure(YAMLGenerator.Feature.MINIMIZE_QUOTES, true) + .configure(YAMLGenerator.Feature.INDENT_ARRAYS, false) + .configure(YAMLGenerator.Feature.SPLIT_LINES, false) + ); + + try { + for (ConfiguredTopic topic : configuredTopics) { + System.out.println(yamlMapper.writeValueAsString(getKafkaTopic(topic))); + } + } catch (JsonProcessingException e) { + LOG.error("Failed to dump config", e); + return FAILURE; + } + + return SUCCESS; + } + + /** + * Takes the best effort to convert the topic names to a resource name compatible format. We are not aiming at perfect + * correctness for mixed cases. Instead, we take some heuristics to get some good enough results. + */ + private String toResourceName(String s) { + // If the topic name contains an underscore, we assume it follows underscore case. + if (s.contains("_")) { + return CaseFormat.LOWER_UNDERSCORE.to(CaseFormat.LOWER_HYPHEN, s.toLowerCase()).replaceAll("[^-.a-z0-9]",""); + } + // We assume the original name is upper camel case and take the best effort, even if it's not. + return CaseFormat.UPPER_CAMEL.to(CaseFormat.LOWER_HYPHEN, s).replaceAll("[^-.a-z0-9]",""); + } + + private KafkaTopic getKafkaTopic(ConfiguredTopic enforcerTopic) { + // Kubernetes resources follow RFC 1123 naming convention, so we have to remove the unsupported characters. To avoid + // duplications, we are appending a short hash string of the original topic name. + String topicHash = Hashing.sha256().hashString(enforcerTopic.getName(), StandardCharsets.UTF_8) + .toString().substring(0, 6); + // There could be multiple topics with the same name but belonging to different Kafka clusters. They are different + // topics. To avoid ambiguity, we prepend the Kafka cluster names in the KafkaTopic metadata name. + String resourceName = kafkaCluster + "." + toResourceName(enforcerTopic.getName()) + "-" + topicHash; + + return new KafkaTopicBuilder() + .withNewMetadata() + .withName(resourceName) + .withLabels(Map.of("strimzi.io/cluster", kafkaCluster)) + .endMetadata() + .withNewSpec() + .withTopicName(enforcerTopic.getName()) + .withPartitions(enforcerTopic.getPartitions()) + .withReplicas((int) enforcerTopic.getReplicationFactor()) + .withConfig(new HashMap<>(enforcerTopic.getConfig())) + .endSpec().build(); + } +} + +