diff --git a/src/modules/services/kafka-connect.nix b/src/modules/services/kafka-connect.nix new file mode 100644 index 000000000..04fe87539 --- /dev/null +++ b/src/modules/services/kafka-connect.nix @@ -0,0 +1,201 @@ +{ pkgs, lib, config, ... }: + +let + kafkaCfg = config.services.kafka; + cfg = config.services.kafka.connect; + types = lib.types; + + stateDir = config.env.DEVENV_STATE + "/kafka/connect"; + + storageFile = stateDir + "/connect.offsets"; + + mkPropertyString = + let + render = { + bool = lib.boolToString; + int = toString; + list = lib.concatMapStringsSep "," mkPropertyString; + string = lib.id; + }; + in + v: render.${builtins.typeOf v} v; + + stringlyGeneric = (attrs: + lib.mapAttrs (_: mkPropertyString) + (lib.filterAttrs (_: v: v != null) attrs) + ); + + stringlySettings = stringlyGeneric cfg.settings; + + generator = (pkgs.formats.javaProperties { }).generate; +in +{ + options.services.kafka.connect = { + enable = lib.mkEnableOption "Kafka Connect"; + + initialConnectors = lib.mkOption { + type = types.listOf (types.submodule { + freeformType = with lib.types; let + primitive = oneOf [ bool int str ]; + in + lazyAttrsOf (nullOr (either primitive (listOf primitive))); + + options = { + name = lib.mkOption { + type = types.str; + description = '' + Name of the connector + ''; + }; + config = lib.mkOption { + type = types.attrs; + description = '' + Initial configuration for the connector + ''; + }; + }; + }); + default = [ ]; + description = '' + List of Kafka Connect connectors to set up initially + ''; + }; + + settings = lib.mkOption { + description = '' + {file}`connect-standalone.properties`. + + Note that .properties files contain mappings from string to string. + Keys with dots are NOT represented by nested attrs in these settings, + but instead as quoted strings (ie. `settings."broker.id"`, NOT + `settings.broker.id`). + ''; + default = { }; + type = lib.types.submodule { + freeformType = with lib.types; let + primitive = oneOf [ bool int str ]; + in + lazyAttrsOf (nullOr (either primitive (listOf primitive))); + + options = { + "listeners" = lib.mkOption { + type = types.nullOr (types.listOf types.str); + default = null; + description = '' + List of listeners for Kafka Connect + (By default Kafka Connect listens on http://localhost:8083) + ''; + example = [ "http://localhost:8080" ]; + }; + + "bootstrap.servers" = lib.mkOption { + type = types.listOf types.str; + description = '' + A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. + ''; + default = [ "localhost:9092" ]; + }; + + "plugin.path" = lib.mkOption { + type = types.nullOr (types.listOf (types.either types.str types.path)); + default = null; + description = '' + The list should consist of top level directories that include any combination of: + a) directories immediately containing jars with plugins and their dependencies + b) uber-jars with plugins and their dependencies + c) directories immediately containing the package directory structure of classes of plugins and their dependencies + Note: symlinks will be followed to discover dependencies or plugins. + ''; + }; + + "offset.storage.file.filename" = lib.mkOption { + type = types.str; + default = storageFile; + description = '' + The file to store connector offsets in. By storing offsets on disk, a standalone process can be stopped and started on a single node and resume where it previously left off. + ''; + }; + + "offset.flush.interval.ms" = lib.mkOption { + type = types.int; + default = 10000; + description = '' + Interval at which to try committing offsets for tasks + ''; + }; + + "key.converter" = lib.mkOption { + type = types.str; + default = "org.apache.kafka.connect.json.JsonConverter"; + description = '' + The key converter to use for the connector. + ''; + }; + + "value.converter" = lib.mkOption { + type = types.str; + default = "org.apache.kafka.connect.json.JsonConverter"; + description = '' + The value converter to use for the connector. + ''; + }; + + "key.converter.schemas.enable" = lib.mkOption { + type = types.bool; + default = true; + description = '' + Whether the key converter should include schema information in the message. + ''; + }; + + "value.converter.schemas.enable" = lib.mkOption { + type = types.bool; + default = true; + description = '' + Whether the value converter should include schema information in the message. + ''; + }; + }; + }; + }; + }; + + config = + let + pkg = kafkaCfg.package; + + configFile = generator "connect-standalone.properties" stringlySettings; + + # TODO: make it work with .properties files? + # connectorFiles = lib.lists.map (c: generator "connector-${c.name}.properties" (stringlyGeneric c)) cfg.initialConnectors; + connectorFiles = lib.lists.map (c: pkgs.writeText "connector.json" (builtins.toJSON c)) cfg.initialConnectors; + connectorFilesConcatted = lib.concatStringsSep " " connectorFiles; + + startKafkaConnect = pkgs.writeShellScriptBin "start-kafka-connect" '' + mkdir -p ${stateDir} + ${pkg}/bin/connect-standalone.sh ${configFile} ${connectorFilesConcatted} + ''; + in + (lib.mkIf cfg.enable (lib.mkIf kafkaCfg.enable { + processes.kafka-connect = { + exec = "${startKafkaConnect}/bin/start-kafka-connect"; + + process-compose = { + readiness_probe = { + initial_delay_seconds = 2; + http_get = { + path = "/connectors"; + port = 8083; + }; + }; + + depends_on = { + kafka = { + condition = "process_healthy"; + }; + }; + }; + }; + + })); +} diff --git a/src/modules/services/kafka.nix b/src/modules/services/kafka.nix new file mode 100644 index 000000000..c33868aae --- /dev/null +++ b/src/modules/services/kafka.nix @@ -0,0 +1,241 @@ +{ pkgs, lib, config, ... }: + +let + cfg = config.services.kafka; + types = lib.types; + + stateDir = config.env.DEVENV_STATE + "/kafka"; + + mkPropertyString = + let + render = { + bool = lib.boolToString; + int = toString; + list = lib.concatMapStringsSep "," mkPropertyString; + string = lib.id; + }; + in + v: render.${builtins.typeOf v} v; + + stringlySettings = lib.mapAttrs (_: mkPropertyString) + (lib.filterAttrs (_: v: v != null) cfg.settings); + + generator = (pkgs.formats.javaProperties { }).generate; +in +{ + options.services.kafka = { + enable = lib.mkEnableOption "Apache Kafka"; + + defaultMode = lib.mkOption { + description = '' + Which defaults to set for the mode Kafka should run in + - `kraft` (default): Run Kafka in KRaft mode, Which requires no extra configuration. + - `zookeeper`: Run Kafka in Zookeeper mode, this requires more configuration. + ''; + default = "kraft"; + type = lib.types.enum [ "zookeeper" "kraft" ]; + }; + + settings = lib.mkOption { + description = '' + [Kafka broker configuration](https://kafka.apache.org/documentation.html#brokerconfigs) + {file}`server.properties`. + + Note that .properties files contain mappings from string to string. + Keys with dots are NOT represented by nested attrs in these settings, + but instead as quoted strings (ie. `settings."broker.id"`, NOT + `settings.broker.id`). + ''; + default = { }; + type = lib.types.submodule { + freeformType = with lib.types; let + primitive = oneOf [ bool int str ]; + in + lazyAttrsOf (nullOr (either primitive (listOf primitive))); + + options = { + "broker.id" = lib.mkOption { + description = "Broker ID. -1 or null to auto-allocate in zookeeper mode."; + default = null; + type = with lib.types; nullOr int; + }; + + "log.dirs" = lib.mkOption { + description = "Log file directories."; + # Deliberaly leave out old default and use the rewrite opportunity + # to have users choose a safer value -- /tmp might be volatile and is a + # slightly scary default choice. + default = [ "${stateDir}/logs" ]; + type = with lib.types; listOf path; + }; + + "listeners" = lib.mkOption { + description = '' + Kafka Listener List. + See [listeners](https://kafka.apache.org/documentation/#brokerconfigs_listeners). + If you change this, you should also update the readiness probe. + ''; + type = lib.types.listOf lib.types.str; + default = [ "PLAINTEXT://localhost:9092" ]; + }; + }; + }; + }; + + configFiles.serverProperties = lib.mkOption { + description = '' + Kafka server.properties configuration file path. + Defaults to the rendered `settings`. + ''; + type = lib.types.path; + }; + + configFiles.log4jProperties = lib.mkOption { + description = "Kafka log4j property configuration file path"; + type = lib.types.path; + default = pkgs.writeText "log4j.properties" cfg.log4jProperties; + defaultText = ''pkgs.writeText "log4j.properties" cfg.log4jProperties''; + }; + + formatLogDirs = lib.mkOption { + description = '' + Whether to format log dirs in KRaft mode if all log dirs are + unformatted, ie. they contain no meta.properties. + ''; + type = lib.types.bool; + default = true; + }; + + formatLogDirsIgnoreFormatted = lib.mkOption { + description = '' + Whether to ignore already formatted log dirs when formatting log dirs, + instead of failing. Useful when replacing or adding disks. + ''; + type = lib.types.bool; + default = true; + }; + + log4jProperties = lib.mkOption { + description = "Kafka log4j property configuration."; + default = '' + log4j.rootLogger=INFO, stdout + + log4j.appender.stdout=org.apache.log4j.ConsoleAppender + log4j.appender.stdout.layout=org.apache.log4j.PatternLayout + log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + ''; + type = lib.types.lines; + }; + + jvmOptions = lib.mkOption { + description = "Extra command line options for the JVM running Kafka."; + default = [ ]; + type = lib.types.listOf lib.types.str; + example = [ + "-Djava.net.preferIPv4Stack=true" + "-Dcom.sun.management.jmxremote" + "-Dcom.sun.management.jmxremote.local.only=true" + ]; + }; + + package = lib.mkPackageOption pkgs "apacheKafka" { }; + + jre = lib.mkOption { + description = "The JRE with which to run Kafka"; + default = cfg.package.passthru.jre; + defaultText = lib.literalExpression "pkgs.apacheKafka.passthru.jre"; + type = lib.types.package; + }; + }; + + config = + let + # From config file example + clusterIdFile = stateDir + "/clusterid"; + logsDir = stateDir + "/logs"; + + getOrGenerateClusterId = '' + CLUSTER_ID=$(cat ${clusterIdFile} 2>/dev/null || ${cfg.package}/bin/kafka-storage.sh random-uuid | tee ${clusterIdFile}) + ''; + + formatLogDirsScript = pkgs.writeShellScriptBin "format-log-dirs" + (if cfg.formatLogDirsIgnoreFormatted then '' + ${getOrGenerateClusterId} + ${cfg.package}/bin/kafka-storage.sh format -t "$CLUSTER_ID" -c ${cfg.configFiles.serverProperties} --ignore-formatted + '' else '' + if ${lib.concatMapStringsSep " && " (l: ''[ ! -f "${l}/meta.properties" ]'') cfg.settings."log.dirs"}; then + ${getOrGenerateClusterId} + ${cfg.package}/bin/kafka-storage.sh format -t "$CLUSTER_ID" -c ${cfg.configFiles.serverProperties} + fi + ''); + + startKafka = pkgs.writeShellScriptBin "start-kafka" '' + set -e + + mkdir -p ${stateDir} + ${formatLogDirsScript}/bin/format-log-dirs + + ${cfg.jre}/bin/java \ + -cp "${cfg.package}/libs/*" \ + -Dlog4j.configuration=file:${cfg.configFiles.log4jProperties} \ + ${toString cfg.jvmOptions} \ + kafka.Kafka \ + ${cfg.configFiles.serverProperties} + ''; + in + lib.mkMerge [ + (lib.mkIf (cfg.defaultMode == "kraft") { + services.kafka.settings = { + "process.roles" = lib.mkDefault [ "broker" "controller" ]; + "broker.id" = lib.mkDefault 1; + "controller.quorum.voters" = lib.mkDefault "1@localhost:9093"; + "listeners" = lib.mkDefault [ "PLAINTEXT://localhost:9092" "CONTROLLER://localhost:9093" ]; + "inter.broker.listener.name" = lib.mkDefault "PLAINTEXT"; + "advertised.listeners" = lib.mkDefault [ "PLAINTEXT://localhost:9092" ]; + "controller.listener.names" = lib.mkDefault [ "CONTROLLER" ]; + "listener.security.protocol.map" = lib.mkDefault [ + "CONTROLLER:PLAINTEXT" + "PLAINTEXT:PLAINTEXT" + "SSL:SSL" + "SASL_PLAINTEXT:SASL_PLAINTEXT" + "SASL_SSL:SASL_SSL" + ]; + + "num.network.threads" = lib.mkDefault 3; + "num.io.threads" = lib.mkDefault 8; + "socket.send.buffer.bytes" = lib.mkDefault 102400; + "socket.receive.buffer.bytes" = lib.mkDefault 102400; + "socket.request.max.bytes" = lib.mkDefault 104857600; + + "num.partitions" = lib.mkDefault 1; + "num.recovery.threads.per.data.dir" = lib.mkDefault 1; + "offsets.topic.replication.factor" = lib.mkDefault 1; + "transaction.state.log.replication.factor" = lib.mkDefault 1; + "transaction.state.log.min.isr" = lib.mkDefault 1; + + "log.retention.hours" = lib.mkDefault 168; + "log.segment.bytes" = lib.mkDefault 1073741824; + "log.retention.check.interval.ms" = lib.mkDefault 300000; + }; + }) + (lib.mkIf cfg.enable { + packages = [ cfg.package ]; + services.kafka.configFiles.serverProperties = generator "server.properties" stringlySettings; + + processes.kafka = { + exec = "${startKafka}/bin/start-kafka"; + + process-compose = { + readiness_probe = { + exec.command = "${cfg.package}/bin/kafka-topics.sh --list --bootstrap-server localhost:9092"; + initial_delay_seconds = 5; + period_seconds = 10; + timeout_seconds = 5; + success_threshold = 1; + failure_threshold = 3; + }; + }; + }; + }) + ]; +} diff --git a/tests/kafka-connect/.test.sh b/tests/kafka-connect/.test.sh new file mode 100755 index 000000000..0d947510a --- /dev/null +++ b/tests/kafka-connect/.test.sh @@ -0,0 +1,8 @@ +set -e + +curl --connect-timeout 5 \ + --max-time 5 \ + --retry 9 \ + --retry-delay 2 \ + --retry-all-errors \ + http://localhost:8083/connectors diff --git a/tests/kafka-connect/devenv.nix b/tests/kafka-connect/devenv.nix new file mode 100644 index 000000000..8017851e0 --- /dev/null +++ b/tests/kafka-connect/devenv.nix @@ -0,0 +1,10 @@ +{ pkgs, ... }: + +{ + services.kafka = { + enable = true; + connect = { + enable = true; + }; + }; +} diff --git a/tests/kafka/.test.sh b/tests/kafka/.test.sh new file mode 100755 index 000000000..479877474 --- /dev/null +++ b/tests/kafka/.test.sh @@ -0,0 +1,3 @@ +set -e + +kafka-topics.sh --list --bootstrap-server localhost:9092 diff --git a/tests/kafka/devenv.nix b/tests/kafka/devenv.nix new file mode 100644 index 000000000..498076d57 --- /dev/null +++ b/tests/kafka/devenv.nix @@ -0,0 +1,6 @@ +{ pkgs, ... }: +{ + services.kafka = { + enable = true; + }; +}