From 873ee0305d5eca778a8c19e3cf4866e99babda01 Mon Sep 17 00:00:00 2001 From: Alexander Nortung Date: Thu, 31 Oct 2024 12:28:17 +0100 Subject: [PATCH 01/13] feat: WIP: Added kafka --- src/modules/services/kafka.nix | 82 ++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 src/modules/services/kafka.nix diff --git a/src/modules/services/kafka.nix b/src/modules/services/kafka.nix new file mode 100644 index 000000000..105ee1415 --- /dev/null +++ b/src/modules/services/kafka.nix @@ -0,0 +1,82 @@ +{ pkgs, lib, config, ... }: + +let + cfg = config.services.kafka; + types = lib.types; + +in +{ + options.services.kafka = { + enable = lib.mkEnableOption "Apache Kafka"; + + package = lib.mkOption { + type = types.package; + description = "Which Apache Kafka package to use"; + default = pkgs.apacheKafka; + defaultText = "pkgs.apacheKafka"; + }; + + # listenPort = lib.mkOption { + # description = "Kafka port to listen on."; + # default = 9092; + # type = types.port; + # }; + + # config = lib.mkOption { + # type = types.attrs; + # default = {}; + # }; + }; + + config = + let + # From config file example + stateDir = config.env.DEVENV_STATE + "/kafka"; + clusterIdFile = stateDir + "/clusterid"; + logsDir = stateDir + "/logs"; + # TODO: Make these options configurable + serverProperties = pkgs.writeText "server.properties" '' + process.roles=broker,controller + node.id=1 + controller.quorum.voters=1@localhost:9093 + listeners=PLAINTEXT://:9092,CONTROLLER://:9093 + inter.broker.listener.name=PLAINTEXT + advertised.listeners=PLAINTEXT://localhost:9092 + controller.listener.names=CONTROLLER + listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL + num.network.threads=3 + num.io.threads=8 + socket.send.buffer.bytes=102400 + socket.receive.buffer.bytes=102400 + socket.request.max.bytes=104857600 + log.dir=${logsDir} + num.partitions=1 + num.recovery.threads.per.data.dir=1 + offsets.topic.replication.factor=1 + transaction.state.log.replication.factor=1 + transaction.state.log.min.isr=1 + log.retention.hours=168 + log.segment.bytes=1073741824 + log.retention.check.interval.ms=300000 + ''; + + startKafka = pkgs.writeShellScriptBin "start-kafka" '' + set -e + + mkdir -p ${stateDir} + CLUSTER_ID=$(cat ${clusterIdFile} 2>/dev/null || ${cfg.package}/bin/kafka-storage.sh random-uuid | tee ${clusterIdFile}) + # If logs dir is empty, format the storage + if [ ! -d ${logsDir} ] || [ ! "$(ls -A ${logsDir})" ]; then + ${cfg.package}/bin/kafka-storage.sh format -t $CLUSTER_ID -c ${serverProperties} + fi + ${cfg.package}/bin/kafka-server-start.sh ${serverProperties} + ''; + in + lib.mkIf cfg.enable { + packages = [ cfg.package ]; + + # processes.kafka-setup.exec = '' + # ''; + processes.kafka.exec = "${startKafka}/bin/start-kafka"; + }; +} From 3c37851d77760b91293584f346005ce2fe80f09a Mon Sep 17 00:00:00 2001 From: Alexander Nortung Date: Thu, 31 Oct 2024 14:25:42 +0100 Subject: [PATCH 02/13] feat: Added kafka connect service --- src/modules/services/kafka-connect.nix | 38 ++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 src/modules/services/kafka-connect.nix diff --git a/src/modules/services/kafka-connect.nix b/src/modules/services/kafka-connect.nix new file mode 100644 index 000000000..7a759a170 --- /dev/null +++ b/src/modules/services/kafka-connect.nix @@ -0,0 +1,38 @@ +{ pkgs, lib, config, ... }: + +let + kafkaCfg = config.services.kafka; + cfg = config.services.kafka.connect; + types = lib.types; + +in +{ + options.services.kafka.connect = { + enable = lib.mkEnableOption "Kafka Connect"; + }; + + config = + let + pkg = kafkaCfg.package; + stateDir = config.env.DEVENV_STATE + "/kafka/connect"; + storageFile = stateDir + "/connect.offsets"; + + configFile = pkgs.writeText "connect-standalone.properties" '' + bootstrap.servers=localhost:9092 + key.converter=org.apache.kafka.connect.json.JsonConverter + value.converter=org.apache.kafka.connect.json.JsonConverter + key.converter.schemas.enable=true + value.converter.schemas.enable=true + offset.storage.file.filename=${storageFile} + offset.flush.interval.ms=10000 + ''; + + startKafkaConnect = pkgs.writeShellScriptBin "start-kafka-connect" '' + mkdir -p ${stateDir} + ${pkg}/bin/connect-standalone.sh ${configFile} + ''; + in + lib.mkIf cfg.enable (lib.mkIf kafkaCfg.enable { + processes.kafka-connect.exec = "${startKafkaConnect}/bin/start-kafka-connect"; + }); +} From 56b4c82d55de84a48a0a42f618edba97c06c92b2 Mon Sep 17 00:00:00 2001 From: Alexander Nortung Date: Thu, 31 Oct 2024 15:56:02 +0100 Subject: [PATCH 03/13] feat(kafka-connect): Added plugins --- src/modules/services/kafka-connect.nix | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/src/modules/services/kafka-connect.nix b/src/modules/services/kafka-connect.nix index 7a759a170..a628d1454 100644 --- a/src/modules/services/kafka-connect.nix +++ b/src/modules/services/kafka-connect.nix @@ -9,6 +9,19 @@ in { options.services.kafka.connect = { enable = lib.mkEnableOption "Kafka Connect"; + + plugins = lib.mkOption { + type = types.listOf types.str; + default = [ ]; + description = '' + List of Kafka Connect plugins to install + The list should consist of top level directories that include any combination of: + a) directories immediately containing jars with plugins and their dependencies + b) uber-jars with plugins and their dependencies + c) directories immediately containing the package directory structure of classes of plugins and their dependencies + Note: symlinks will be followed to discover dependencies or plugins. + ''; + }; }; config = @@ -25,6 +38,7 @@ in value.converter.schemas.enable=true offset.storage.file.filename=${storageFile} offset.flush.interval.ms=10000 + ${lib.optionalString (lib.lists.length cfg.plugins <= 0) "plugin.path=${lib.concatStringsSep "," cfg.plugins}"} ''; startKafkaConnect = pkgs.writeShellScriptBin "start-kafka-connect" '' From 6779d30607d110361ea6cc2789e3ff6c31348afb Mon Sep 17 00:00:00 2001 From: Alexander Nortung Date: Thu, 31 Oct 2024 17:28:04 +0100 Subject: [PATCH 04/13] feat(kafka-connect): Added initial connectors --- src/modules/services/kafka-connect.nix | 60 ++++++++++++++++++++++++-- 1 file changed, 57 insertions(+), 3 deletions(-) diff --git a/src/modules/services/kafka-connect.nix b/src/modules/services/kafka-connect.nix index a628d1454..40954e2c4 100644 --- a/src/modules/services/kafka-connect.nix +++ b/src/modules/services/kafka-connect.nix @@ -10,11 +10,20 @@ in options.services.kafka.connect = { enable = lib.mkEnableOption "Kafka Connect"; - plugins = lib.mkOption { + listeners = lib.mkOption { + type = types.listOf types.str; + default = [ ]; + description = '' + List of listeners for Kafka Connect + (By default Kafka Connect listens on http://localhost:8083) + ''; + example = [ "http://localhost:8080" ]; + }; + + pluginDirectories = lib.mkOption { type = types.listOf types.str; default = [ ]; description = '' - List of Kafka Connect plugins to install The list should consist of top level directories that include any combination of: a) directories immediately containing jars with plugins and their dependencies b) uber-jars with plugins and their dependencies @@ -22,6 +31,29 @@ in Note: symlinks will be followed to discover dependencies or plugins. ''; }; + + initialConnectors = lib.mkOption { + type = types.listOf (types.submodule { + options = { + name = lib.mkOption { + type = types.str; + description = '' + Name of the connector + ''; + }; + config = lib.mkOption { + type = types.attrs; + description = '' + Initial configuration for the connector + ''; + }; + }; + }); + default = [ ]; + description = '' + List of Kafka Connect connectors to set up initially + ''; + }; }; config = @@ -38,15 +70,37 @@ in value.converter.schemas.enable=true offset.storage.file.filename=${storageFile} offset.flush.interval.ms=10000 - ${lib.optionalString (lib.lists.length cfg.plugins <= 0) "plugin.path=${lib.concatStringsSep "," cfg.plugins}"} + ${lib.optionalString (lib.lists.length cfg.listeners > 0) "listeners=${lib.concatStringsSep "," cfg.listeners}"} + ${lib.optionalString (lib.lists.length cfg.pluginDirectories > 0) "plugin.path=${lib.concatStringsSep "," cfg.pluginDirectories}"} ''; startKafkaConnect = pkgs.writeShellScriptBin "start-kafka-connect" '' mkdir -p ${stateDir} ${pkg}/bin/connect-standalone.sh ${configFile} ''; + + firstListener = if lib.length cfg.listeners > 0 then (lib.lists.head cfg.listeners) else "http://localhost:8083"; + # initialConnectorNames = lib.lists.map (c: c.name) cfg.initialConnectors; + + /** + * @param {string} connector config as JSON string + */ + setupConnector = pkgs.writeShellScriptBin "setup-connector" '' + ${pkgs.curl}/bin/curl \ + -X POST \ + -H "Content-Type: application/json" \ + --data $1 \ + ${firstListener}/connectors + ''; + + setupConnectorsCommands = lib.lists.map (c: "${setupConnector}/bin/setup-connector '${builtins.toJSON c}'") cfg.initialConnectors; + setupConnectors = pkgs.writeShellScriptBin "setup-connectors" '' + ${lib.concatStringsSep "\n" setupConnectorsCommands} + ''; in lib.mkIf cfg.enable (lib.mkIf kafkaCfg.enable { processes.kafka-connect.exec = "${startKafkaConnect}/bin/start-kafka-connect"; + # TODO: Make this process run when kafka connect is ready + processes.kafka-connect-setup.exec = "${setupConnectors}/bin/setup-connectors"; }); } From c88fd4371ecf7c7adef7ca7ceaa1ec575598421c Mon Sep 17 00:00:00 2001 From: Alexander Nortung Date: Thu, 31 Oct 2024 17:37:39 +0100 Subject: [PATCH 05/13] fix(kafka-connect): Setup connectors --- src/modules/services/kafka-connect.nix | 27 +++++--------------------- 1 file changed, 5 insertions(+), 22 deletions(-) diff --git a/src/modules/services/kafka-connect.nix b/src/modules/services/kafka-connect.nix index 40954e2c4..ae624adc9 100644 --- a/src/modules/services/kafka-connect.nix +++ b/src/modules/services/kafka-connect.nix @@ -74,33 +74,16 @@ in ${lib.optionalString (lib.lists.length cfg.pluginDirectories > 0) "plugin.path=${lib.concatStringsSep "," cfg.pluginDirectories}"} ''; + # Create a json file for each connector + connectorFiles = lib.lists.map (c: pkgs.writeText "connector.json" (builtins.toJSON c)) cfg.initialConnectors; + connectorFilesConcatted = lib.concatStringsSep " " connectorFiles; + startKafkaConnect = pkgs.writeShellScriptBin "start-kafka-connect" '' mkdir -p ${stateDir} - ${pkg}/bin/connect-standalone.sh ${configFile} - ''; - - firstListener = if lib.length cfg.listeners > 0 then (lib.lists.head cfg.listeners) else "http://localhost:8083"; - # initialConnectorNames = lib.lists.map (c: c.name) cfg.initialConnectors; - - /** - * @param {string} connector config as JSON string - */ - setupConnector = pkgs.writeShellScriptBin "setup-connector" '' - ${pkgs.curl}/bin/curl \ - -X POST \ - -H "Content-Type: application/json" \ - --data $1 \ - ${firstListener}/connectors - ''; - - setupConnectorsCommands = lib.lists.map (c: "${setupConnector}/bin/setup-connector '${builtins.toJSON c}'") cfg.initialConnectors; - setupConnectors = pkgs.writeShellScriptBin "setup-connectors" '' - ${lib.concatStringsSep "\n" setupConnectorsCommands} + ${pkg}/bin/connect-standalone.sh ${configFile} ${connectorFilesConcatted} ''; in lib.mkIf cfg.enable (lib.mkIf kafkaCfg.enable { processes.kafka-connect.exec = "${startKafkaConnect}/bin/start-kafka-connect"; - # TODO: Make this process run when kafka connect is ready - processes.kafka-connect-setup.exec = "${setupConnectors}/bin/setup-connectors"; }); } From 08b1b6265a02cb3238571eaff928fe47950cd1b7 Mon Sep 17 00:00:00 2001 From: Alexander Nortung Date: Fri, 1 Nov 2024 10:57:38 +0100 Subject: [PATCH 06/13] feat(kafka): Updated interface and startup to match NixOS module Also added defaults to run in kraft mode --- src/modules/services/kafka.nix | 245 ++++++++++++++++++++++++++------- 1 file changed, 194 insertions(+), 51 deletions(-) diff --git a/src/modules/services/kafka.nix b/src/modules/services/kafka.nix index 105ee1415..8c1cc6c33 100644 --- a/src/modules/services/kafka.nix +++ b/src/modules/services/kafka.nix @@ -4,79 +4,222 @@ let cfg = config.services.kafka; types = lib.types; + stateDir = config.env.DEVENV_STATE + "/kafka"; + + mkPropertyString = + let + render = { + bool = lib.boolToString; + int = toString; + list = lib.concatMapStringsSep "," mkPropertyString; + string = lib.id; + }; + in + v: render.${builtins.typeOf v} v; + + stringlySettings = lib.mapAttrs (_: mkPropertyString) + (lib.filterAttrs (_: v: v != null) cfg.settings); + + generator = (pkgs.formats.javaProperties { }).generate; in { options.services.kafka = { enable = lib.mkEnableOption "Apache Kafka"; - package = lib.mkOption { - type = types.package; - description = "Which Apache Kafka package to use"; - default = pkgs.apacheKafka; - defaultText = "pkgs.apacheKafka"; + defaultMode = lib.mkOption { + description = '' + Which defaults to set for the mode Kafka should run in + - `kraft` (default): Run Kafka in KRaft mode, Which requires no extra configuration. + - `zookeeper`: Run Kafka in Zookeeper mode, this requires more configuration. + ''; + default = "kraft"; + type = lib.types.enum [ "zookeeper" "kraft" ]; + }; + + settings = lib.mkOption { + description = '' + [Kafka broker configuration](https://kafka.apache.org/documentation.html#brokerconfigs) + {file}`server.properties`. + + Note that .properties files contain mappings from string to string. + Keys with dots are NOT represented by nested attrs in these settings, + but instead as quoted strings (ie. `settings."broker.id"`, NOT + `settings.broker.id`). + ''; + default = { }; + type = lib.types.submodule { + freeformType = with lib.types; let + primitive = oneOf [ bool int str ]; + in + lazyAttrsOf (nullOr (either primitive (listOf primitive))); + + options = { + "broker.id" = lib.mkOption { + description = "Broker ID. -1 or null to auto-allocate in zookeeper mode."; + default = null; + type = with lib.types; nullOr int; + }; + + "log.dirs" = lib.mkOption { + description = "Log file directories."; + # Deliberaly leave out old default and use the rewrite opportunity + # to have users choose a safer value -- /tmp might be volatile and is a + # slightly scary default choice. + default = [ "${stateDir}/logs" ]; + type = with lib.types; listOf path; + }; + + "listeners" = lib.mkOption { + description = '' + Kafka Listener List. + See [listeners](https://kafka.apache.org/documentation/#brokerconfigs_listeners). + ''; + type = lib.types.listOf lib.types.str; + default = [ "PLAINTEXT://localhost:9092" ]; + }; + }; + }; + }; + + configFiles.serverProperties = lib.mkOption { + description = '' + Kafka server.properties configuration file path. + Defaults to the rendered `settings`. + ''; + type = lib.types.path; + }; + + configFiles.log4jProperties = lib.mkOption { + description = "Kafka log4j property configuration file path"; + type = lib.types.path; + default = pkgs.writeText "log4j.properties" cfg.log4jProperties; + defaultText = ''pkgs.writeText "log4j.properties" cfg.log4jProperties''; }; - # listenPort = lib.mkOption { - # description = "Kafka port to listen on."; - # default = 9092; - # type = types.port; - # }; + formatLogDirs = lib.mkOption { + description = '' + Whether to format log dirs in KRaft mode if all log dirs are + unformatted, ie. they contain no meta.properties. + ''; + type = lib.types.bool; + default = true; + }; - # config = lib.mkOption { - # type = types.attrs; - # default = {}; - # }; + formatLogDirsIgnoreFormatted = lib.mkOption { + description = '' + Whether to ignore already formatted log dirs when formatting log dirs, + instead of failing. Useful when replacing or adding disks. + ''; + type = lib.types.bool; + default = true; + }; + + log4jProperties = lib.mkOption { + description = "Kafka log4j property configuration."; + default = '' + log4j.rootLogger=INFO, stdout + + log4j.appender.stdout=org.apache.log4j.ConsoleAppender + log4j.appender.stdout.layout=org.apache.log4j.PatternLayout + log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + ''; + type = lib.types.lines; + }; + + jvmOptions = lib.mkOption { + description = "Extra command line options for the JVM running Kafka."; + default = [ ]; + type = lib.types.listOf lib.types.str; + example = [ + "-Djava.net.preferIPv4Stack=true" + "-Dcom.sun.management.jmxremote" + "-Dcom.sun.management.jmxremote.local.only=true" + ]; + }; + + package = lib.mkPackageOption pkgs "apacheKafka" { }; + + jre = lib.mkOption { + description = "The JRE with which to run Kafka"; + default = cfg.package.passthru.jre; + defaultText = lib.literalExpression "pkgs.apacheKafka.passthru.jre"; + type = lib.types.package; + }; }; config = let # From config file example - stateDir = config.env.DEVENV_STATE + "/kafka"; clusterIdFile = stateDir + "/clusterid"; logsDir = stateDir + "/logs"; - # TODO: Make these options configurable - serverProperties = pkgs.writeText "server.properties" '' - process.roles=broker,controller - node.id=1 - controller.quorum.voters=1@localhost:9093 - listeners=PLAINTEXT://:9092,CONTROLLER://:9093 - inter.broker.listener.name=PLAINTEXT - advertised.listeners=PLAINTEXT://localhost:9092 - controller.listener.names=CONTROLLER - listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL - num.network.threads=3 - num.io.threads=8 - socket.send.buffer.bytes=102400 - socket.receive.buffer.bytes=102400 - socket.request.max.bytes=104857600 - log.dir=${logsDir} - num.partitions=1 - num.recovery.threads.per.data.dir=1 - offsets.topic.replication.factor=1 - transaction.state.log.replication.factor=1 - transaction.state.log.min.isr=1 - log.retention.hours=168 - log.segment.bytes=1073741824 - log.retention.check.interval.ms=300000 + + getOrGenerateClusterId = '' + CLUSTER_ID=$(cat ${clusterIdFile} 2>/dev/null || ${cfg.package}/bin/kafka-storage.sh random-uuid | tee ${clusterIdFile}) ''; + formatLogDirsScript = pkgs.writeShellScriptBin "format-log-dirs" + (if cfg.formatLogDirsIgnoreFormatted then '' + ${getOrGenerateClusterId} + ${cfg.package}/bin/kafka-storage.sh format -t "$CLUSTER_ID" -c ${cfg.configFiles.serverProperties} --ignore-formatted + '' else '' + if ${lib.concatMapStringsSep " && " (l: ''[ ! -f "${l}/meta.properties" ]'') cfg.settings."log.dirs"}; then + ${getOrGenerateClusterId} + ${cfg.package}/bin/kafka-storage.sh format -t "$CLUSTER_ID" -c ${cfg.configFiles.serverProperties} + fi + ''); + startKafka = pkgs.writeShellScriptBin "start-kafka" '' set -e mkdir -p ${stateDir} - CLUSTER_ID=$(cat ${clusterIdFile} 2>/dev/null || ${cfg.package}/bin/kafka-storage.sh random-uuid | tee ${clusterIdFile}) - # If logs dir is empty, format the storage - if [ ! -d ${logsDir} ] || [ ! "$(ls -A ${logsDir})" ]; then - ${cfg.package}/bin/kafka-storage.sh format -t $CLUSTER_ID -c ${serverProperties} - fi - ${cfg.package}/bin/kafka-server-start.sh ${serverProperties} + ${formatLogDirsScript}/bin/format-log-dirs + + ${cfg.jre}/bin/java \ + -cp "${cfg.package}/libs/*" \ + -Dlog4j.configuration=file:${cfg.configFiles.log4jProperties} \ + ${toString cfg.jvmOptions} \ + kafka.Kafka \ + ${cfg.configFiles.serverProperties} ''; in - lib.mkIf cfg.enable { - packages = [ cfg.package ]; + lib.mkMerge [ + (lib.mkIf (cfg.mode == "kraft") { + services.kafka.settings = { + "process.roles" = lib.mkDefault [ "broker" "controller" ]; + "broker.id" = lib.mkDefault 1; + "controller.quorum.voters" = lib.mkDefault "1@localhost:9093"; + "listeners" = lib.mkDefault [ "PLAINTEXT://localhost:9092" "CONTROLLER://localhost:9093" ]; + "inter.broker.listener.name" = lib.mkDefault "PLAINTEXT"; + "advertised.listeners" = lib.mkDefault [ "PLAINTEXT://localhost:9092" ]; + "controller.listener.names" = lib.mkDefault [ "CONTROLLER" ]; + "listener.security.protocol.map" = lib.mkDefault [ + "CONTROLLER:PLAINTEXT" + "PLAINTEXT:PLAINTEXT" + "SSL:SSL" + "SASL_PLAINTEXT:SASL_PLAINTEXT" + "SASL_SSL:SASL_SSL" + ]; - # processes.kafka-setup.exec = '' - # ''; - processes.kafka.exec = "${startKafka}/bin/start-kafka"; - }; + "num.network.threads" = lib.mkDefault 3; + "num.io.threads" = lib.mkDefault 8; + "socket.send.buffer.bytes" = lib.mkDefault 102400; + "socket.receive.buffer.bytes" = lib.mkDefault 102400; + "socket.request.max.bytes" = lib.mkDefault 104857600; + + "num.partitions" = lib.mkDefault 1; + "num.recovery.threads.per.data.dir" = lib.mkDefault 1; + "offsets.topic.replication.factor" = lib.mkDefault 1; + "transaction.state.log.replication.factor" = lib.mkDefault 1; + "transaction.state.log.min.isr" = lib.mkDefault 1; + }; + }) + (lib.mkIf cfg.enable { + packages = [ cfg.package ]; + services.kafka.configFiles.serverProperties = generator "server.properties" stringlySettings; + + # processes.kafka-setup.exec = '' + # ''; + processes.kafka.exec = "${startKafka}/bin/start-kafka"; + }) + ]; } From 68136e5c3c1c5154a3ca845ae993e50c698a8ca7 Mon Sep 17 00:00:00 2001 From: Alexander Nortung Date: Fri, 1 Nov 2024 12:53:35 +0100 Subject: [PATCH 07/13] fix(kafka): Added more defaults --- src/modules/services/kafka.nix | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/src/modules/services/kafka.nix b/src/modules/services/kafka.nix index 8c1cc6c33..250e8b061 100644 --- a/src/modules/services/kafka.nix +++ b/src/modules/services/kafka.nix @@ -73,6 +73,7 @@ in description = '' Kafka Listener List. See [listeners](https://kafka.apache.org/documentation/#brokerconfigs_listeners). + If you change this, you should also update the readiness probe. ''; type = lib.types.listOf lib.types.str; default = [ "PLAINTEXT://localhost:9092" ]; @@ -183,7 +184,7 @@ in ''; in lib.mkMerge [ - (lib.mkIf (cfg.mode == "kraft") { + (lib.mkIf (cfg.defaultMode == "kraft") { services.kafka.settings = { "process.roles" = lib.mkDefault [ "broker" "controller" ]; "broker.id" = lib.mkDefault 1; @@ -211,15 +212,36 @@ in "offsets.topic.replication.factor" = lib.mkDefault 1; "transaction.state.log.replication.factor" = lib.mkDefault 1; "transaction.state.log.min.isr" = lib.mkDefault 1; + + "log.retention.hours" = lib.mkDefault 168; + "log.segment.bytes" = lib.mkDefault 1073741824; + "log.retention.check.interval.ms" = lib.mkDefault 300000; }; }) (lib.mkIf cfg.enable { packages = [ cfg.package ]; services.kafka.configFiles.serverProperties = generator "server.properties" stringlySettings; - # processes.kafka-setup.exec = '' - # ''; - processes.kafka.exec = "${startKafka}/bin/start-kafka"; + processes.kafka = { + exec = "${startKafka}/bin/start-kafka"; + + # process-compose = { + # readiness_probe = { + # # exec.command = "${pkgs.curl}/bin/curl -f -k http://localhost:9092/topics"; + # http_get = { + # host = "localhost"; + # scheme = "http"; + # path = "/topics"; + # port = 9092; + # }; + # initial_delay_seconds = 5; + # period_seconds = 10; + # timeout_seconds = 5; + # success_threshold = 1; + # failure_threshold = 3; + # }; + # }; + }; }) ]; } From 9bbf08ebaba635dd69c4d23c089e5180ff57b48d Mon Sep 17 00:00:00 2001 From: Alexander Nortung Date: Fri, 1 Nov 2024 12:53:51 +0100 Subject: [PATCH 08/13] feat(kafka-connect): Added settings so it can be configured like kafka --- src/modules/services/kafka-connect.nix | 164 +++++++++++++++++++------ 1 file changed, 125 insertions(+), 39 deletions(-) diff --git a/src/modules/services/kafka-connect.nix b/src/modules/services/kafka-connect.nix index ae624adc9..5e9b9bbea 100644 --- a/src/modules/services/kafka-connect.nix +++ b/src/modules/services/kafka-connect.nix @@ -5,35 +5,41 @@ let cfg = config.services.kafka.connect; types = lib.types; + stateDir = config.env.DEVENV_STATE + "/kafka/connect"; + + storageFile = stateDir + "/connect.offsets"; + + mkPropertyString = + let + render = { + bool = lib.boolToString; + int = toString; + list = lib.concatMapStringsSep "," mkPropertyString; + string = lib.id; + }; + in + v: render.${builtins.typeOf v} v; + + stringlyGeneric = (attrs: + lib.mapAttrs (_: mkPropertyString) + (lib.filterAttrs (_: v: v != null) attrs) + ); + + stringlySettings = stringlyGeneric cfg.settings; + + generator = (pkgs.formats.javaProperties { }).generate; in { options.services.kafka.connect = { enable = lib.mkEnableOption "Kafka Connect"; - listeners = lib.mkOption { - type = types.listOf types.str; - default = [ ]; - description = '' - List of listeners for Kafka Connect - (By default Kafka Connect listens on http://localhost:8083) - ''; - example = [ "http://localhost:8080" ]; - }; - - pluginDirectories = lib.mkOption { - type = types.listOf types.str; - default = [ ]; - description = '' - The list should consist of top level directories that include any combination of: - a) directories immediately containing jars with plugins and their dependencies - b) uber-jars with plugins and their dependencies - c) directories immediately containing the package directory structure of classes of plugins and their dependencies - Note: symlinks will be followed to discover dependencies or plugins. - ''; - }; - initialConnectors = lib.mkOption { type = types.listOf (types.submodule { + freeformType = with lib.types; let + primitive = oneOf [ bool int str ]; + in + lazyAttrsOf (nullOr (either primitive (listOf primitive))); + options = { name = lib.mkOption { type = types.str; @@ -54,27 +60,107 @@ in List of Kafka Connect connectors to set up initially ''; }; + + settings = lib.mkOption { + description = '' + {file}`connect-standalone.properties`. + + Note that .properties files contain mappings from string to string. + Keys with dots are NOT represented by nested attrs in these settings, + but instead as quoted strings (ie. `settings."broker.id"`, NOT + `settings.broker.id`). + ''; + default = { }; + type = lib.types.submodule { + freeformType = with lib.types; let + primitive = oneOf [ bool int str ]; + in + lazyAttrsOf (nullOr (either primitive (listOf primitive))); + + options = { + "listeners" = lib.mkOption { + type = types.nullOr (types.listOf types.str); + default = null; + description = '' + List of listeners for Kafka Connect + (By default Kafka Connect listens on http://localhost:8083) + ''; + example = [ "http://localhost:8080" ]; + }; + + "bootstrap.servers" = lib.mkOption { + type = types.listOf types.str; + description = '' + A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. + ''; + default = [ "localhost:9092" ]; + }; + + "plugin.path" = lib.mkOption { + type = types.nullOr (types.listOf (types.either types.str types.path)); + description = '' + The list should consist of top level directories that include any combination of: + a) directories immediately containing jars with plugins and their dependencies + b) uber-jars with plugins and their dependencies + c) directories immediately containing the package directory structure of classes of plugins and their dependencies + Note: symlinks will be followed to discover dependencies or plugins. + ''; + }; + + "offset.storage.file.filename" = lib.mkOption { + type = types.str; + default = storageFile; + }; + + "offset.flush.interval.ms" = lib.mkOption { + type = types.int; + default = 10000; + }; + + "key.converter" = lib.mkOption { + type = types.str; + default = "org.apache.kafka.connect.json.JsonConverter"; + description = '' + The key converter to use for the connector. + ''; + }; + + "value.converter" = lib.mkOption { + type = types.str; + default = "org.apache.kafka.connect.json.JsonConverter"; + description = '' + The value converter to use for the connector. + ''; + }; + + "key.converter.schemas.enable" = lib.mkOption { + type = types.bool; + default = true; + description = '' + Whether the key converter should include schema information in the message. + ''; + }; + + "value.converter.schemas.enable" = lib.mkOption { + type = types.bool; + default = true; + description = '' + Whether the value converter should include schema information in the message. + ''; + }; + }; + }; + }; }; config = let pkg = kafkaCfg.package; - stateDir = config.env.DEVENV_STATE + "/kafka/connect"; - storageFile = stateDir + "/connect.offsets"; - - configFile = pkgs.writeText "connect-standalone.properties" '' - bootstrap.servers=localhost:9092 - key.converter=org.apache.kafka.connect.json.JsonConverter - value.converter=org.apache.kafka.connect.json.JsonConverter - key.converter.schemas.enable=true - value.converter.schemas.enable=true - offset.storage.file.filename=${storageFile} - offset.flush.interval.ms=10000 - ${lib.optionalString (lib.lists.length cfg.listeners > 0) "listeners=${lib.concatStringsSep "," cfg.listeners}"} - ${lib.optionalString (lib.lists.length cfg.pluginDirectories > 0) "plugin.path=${lib.concatStringsSep "," cfg.pluginDirectories}"} - ''; - # Create a json file for each connector + configFile = generator "connect-standalone.properties" stringlySettings; + + # TODO: make it work with .properties files? + # connectorFiles = lib.lists.map (c: generator "connector-${c.name}.properties" (stringlyGeneric c)) cfg.initialConnectors; connectorFiles = lib.lists.map (c: pkgs.writeText "connector.json" (builtins.toJSON c)) cfg.initialConnectors; connectorFilesConcatted = lib.concatStringsSep " " connectorFiles; @@ -83,7 +169,7 @@ in ${pkg}/bin/connect-standalone.sh ${configFile} ${connectorFilesConcatted} ''; in - lib.mkIf cfg.enable (lib.mkIf kafkaCfg.enable { + (lib.mkIf cfg.enable (lib.mkIf kafkaCfg.enable { processes.kafka-connect.exec = "${startKafkaConnect}/bin/start-kafka-connect"; - }); + })); } From fb4dab3c7addd06b8f2546015270f85abd332138 Mon Sep 17 00:00:00 2001 From: Alexander Nortung Date: Fri, 1 Nov 2024 13:00:49 +0100 Subject: [PATCH 09/13] chore(kafka): Added test --- tests/kafka/.test.sh | 3 +++ tests/kafka/devenv.nix | 6 ++++++ 2 files changed, 9 insertions(+) create mode 100755 tests/kafka/.test.sh create mode 100644 tests/kafka/devenv.nix diff --git a/tests/kafka/.test.sh b/tests/kafka/.test.sh new file mode 100755 index 000000000..479877474 --- /dev/null +++ b/tests/kafka/.test.sh @@ -0,0 +1,3 @@ +set -e + +kafka-topics.sh --list --bootstrap-server localhost:9092 diff --git a/tests/kafka/devenv.nix b/tests/kafka/devenv.nix new file mode 100644 index 000000000..498076d57 --- /dev/null +++ b/tests/kafka/devenv.nix @@ -0,0 +1,6 @@ +{ pkgs, ... }: +{ + services.kafka = { + enable = true; + }; +} From d80734ef7867a0281a4d76b6d6a95b94aa3aa25b Mon Sep 17 00:00:00 2001 From: Alexander Nortung Date: Mon, 11 Nov 2024 10:13:50 +0100 Subject: [PATCH 10/13] chore(kafka): Added more descriptions --- src/modules/services/kafka-connect.nix | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/modules/services/kafka-connect.nix b/src/modules/services/kafka-connect.nix index 5e9b9bbea..6ea1e4b48 100644 --- a/src/modules/services/kafka-connect.nix +++ b/src/modules/services/kafka-connect.nix @@ -110,11 +110,17 @@ in "offset.storage.file.filename" = lib.mkOption { type = types.str; default = storageFile; + description = '' + The file to store connector offsets in. By storing offsets on disk, a standalone process can be stopped and started on a single node and resume where it previously left off. + ''; }; "offset.flush.interval.ms" = lib.mkOption { type = types.int; default = 10000; + description = '' + Interval at which to try committing offsets for tasks + ''; }; "key.converter" = lib.mkOption { From bc867b7799185647117abbd9bf31ffd055cc7c93 Mon Sep 17 00:00:00 2001 From: Alexander Nortung Date: Fri, 15 Nov 2024 09:19:21 +0100 Subject: [PATCH 11/13] feat(kafka-connect): Added readiness check and test --- src/modules/services/kafka-connect.nix | 16 +++++++++++++++- tests/kafka-connect/.test.sh | 3 +++ tests/kafka-connect/devenv.nix | 10 ++++++++++ 3 files changed, 28 insertions(+), 1 deletion(-) create mode 100755 tests/kafka-connect/.test.sh create mode 100644 tests/kafka-connect/devenv.nix diff --git a/src/modules/services/kafka-connect.nix b/src/modules/services/kafka-connect.nix index 6ea1e4b48..000c0800e 100644 --- a/src/modules/services/kafka-connect.nix +++ b/src/modules/services/kafka-connect.nix @@ -98,6 +98,7 @@ in "plugin.path" = lib.mkOption { type = types.nullOr (types.listOf (types.either types.str types.path)); + default = null; description = '' The list should consist of top level directories that include any combination of: a) directories immediately containing jars with plugins and their dependencies @@ -176,6 +177,19 @@ in ''; in (lib.mkIf cfg.enable (lib.mkIf kafkaCfg.enable { - processes.kafka-connect.exec = "${startKafkaConnect}/bin/start-kafka-connect"; + processes.kafka-connect = { + exec = "${startKafkaConnect}/bin/start-kafka-connect"; + + process-compose = { + readiness_probe = { + initial_delay_seconds = 2; + http_get = { + path = "/connectors"; + port = "8083"; + }; + }; + }; + }; + })); } diff --git a/tests/kafka-connect/.test.sh b/tests/kafka-connect/.test.sh new file mode 100755 index 000000000..e9690a205 --- /dev/null +++ b/tests/kafka-connect/.test.sh @@ -0,0 +1,3 @@ +set -e + +curl http://localhost:8083/connectors -H "Content-Type: application/json" diff --git a/tests/kafka-connect/devenv.nix b/tests/kafka-connect/devenv.nix new file mode 100644 index 000000000..8017851e0 --- /dev/null +++ b/tests/kafka-connect/devenv.nix @@ -0,0 +1,10 @@ +{ pkgs, ... }: + +{ + services.kafka = { + enable = true; + connect = { + enable = true; + }; + }; +} From 1ba9070038220d055ffbb5232797b5278b31abcc Mon Sep 17 00:00:00 2001 From: Alexander Nortung Date: Fri, 15 Nov 2024 09:24:46 +0100 Subject: [PATCH 12/13] feat(kafka): Added readiness probe --- src/modules/services/kafka-connect.nix | 6 ++++++ src/modules/services/kafka.nix | 26 ++++++++++---------------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/modules/services/kafka-connect.nix b/src/modules/services/kafka-connect.nix index 000c0800e..f18af9a10 100644 --- a/src/modules/services/kafka-connect.nix +++ b/src/modules/services/kafka-connect.nix @@ -188,6 +188,12 @@ in port = "8083"; }; }; + + depends_on = { + kafka = { + condition = "process_healthy"; + }; + }; }; }; diff --git a/src/modules/services/kafka.nix b/src/modules/services/kafka.nix index 250e8b061..c33868aae 100644 --- a/src/modules/services/kafka.nix +++ b/src/modules/services/kafka.nix @@ -225,22 +225,16 @@ in processes.kafka = { exec = "${startKafka}/bin/start-kafka"; - # process-compose = { - # readiness_probe = { - # # exec.command = "${pkgs.curl}/bin/curl -f -k http://localhost:9092/topics"; - # http_get = { - # host = "localhost"; - # scheme = "http"; - # path = "/topics"; - # port = 9092; - # }; - # initial_delay_seconds = 5; - # period_seconds = 10; - # timeout_seconds = 5; - # success_threshold = 1; - # failure_threshold = 3; - # }; - # }; + process-compose = { + readiness_probe = { + exec.command = "${cfg.package}/bin/kafka-topics.sh --list --bootstrap-server localhost:9092"; + initial_delay_seconds = 5; + period_seconds = 10; + timeout_seconds = 5; + success_threshold = 1; + failure_threshold = 3; + }; + }; }; }) ]; From 4e570684acee675da33a12ee686c92a5acc67c3b Mon Sep 17 00:00:00 2001 From: Alexander Nortung Date: Fri, 15 Nov 2024 09:44:46 +0100 Subject: [PATCH 13/13] chore(kafka): fix tests --- src/modules/services/kafka-connect.nix | 2 +- tests/kafka-connect/.test.sh | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/modules/services/kafka-connect.nix b/src/modules/services/kafka-connect.nix index f18af9a10..04fe87539 100644 --- a/src/modules/services/kafka-connect.nix +++ b/src/modules/services/kafka-connect.nix @@ -185,7 +185,7 @@ in initial_delay_seconds = 2; http_get = { path = "/connectors"; - port = "8083"; + port = 8083; }; }; diff --git a/tests/kafka-connect/.test.sh b/tests/kafka-connect/.test.sh index e9690a205..0d947510a 100755 --- a/tests/kafka-connect/.test.sh +++ b/tests/kafka-connect/.test.sh @@ -1,3 +1,8 @@ set -e -curl http://localhost:8083/connectors -H "Content-Type: application/json" +curl --connect-timeout 5 \ + --max-time 5 \ + --retry 9 \ + --retry-delay 2 \ + --retry-all-errors \ + http://localhost:8083/connectors