From e4738a2a53343c1e66dbafa13209df19ed664d48 Mon Sep 17 00:00:00 2001 From: Richard <30619381+richard-axual@users.noreply.github.com> Date: Tue, 17 Sep 2024 18:08:09 +0200 Subject: [PATCH 1/2] Fix logic issue with Prometheus Export and ServiceMonitor (#152) * Update KSML ServiceMonitor to be disabled if Prometheus export is not configured. Added a warning in the nodes if ServiceMonitor is enabled, but Prometheus is not. Statefulset and service will only define metrics port if Prometheus is enabled Update example in Helm Readme, it used the wrong filename in the definition * Remove comment from example in Helm readme --- packaging/helm-charts/ksml/.helmignore | 1 + packaging/helm-charts/ksml/README.md | 6 ++---- packaging/helm-charts/ksml/templates/NOTES.txt | 6 +++++- packaging/helm-charts/ksml/templates/service.yaml | 2 ++ packaging/helm-charts/ksml/templates/servicemonitor.yaml | 2 +- packaging/helm-charts/ksml/templates/statefulset.yaml | 2 ++ packaging/helm-charts/ksml/values.yaml | 4 ++-- 7 files changed, 15 insertions(+), 8 deletions(-) diff --git a/packaging/helm-charts/ksml/.helmignore b/packaging/helm-charts/ksml/.helmignore index 73543e59..9acbc3cc 100644 --- a/packaging/helm-charts/ksml/.helmignore +++ b/packaging/helm-charts/ksml/.helmignore @@ -25,3 +25,4 @@ *.tgz *.tbz +workspace/ \ No newline at end of file diff --git a/packaging/helm-charts/ksml/README.md b/packaging/helm-charts/ksml/README.md index 548019b2..e04e7c74 100644 --- a/packaging/helm-charts/ksml/README.md +++ b/packaging/helm-charts/ksml/README.md @@ -153,8 +153,7 @@ resources: ksmlRunnerConfig: definitions: -# generate: generator.yaml - inspect: peek.yaml + generate: generator.yaml kafka: application.id: example.datagen @@ -266,8 +265,7 @@ resources: ksmlRunnerConfig: definitions: -# generate: generator.yaml - inspect: peek.yaml + generate: generator.yaml kafka: application.id: example.datagen diff --git a/packaging/helm-charts/ksml/templates/NOTES.txt b/packaging/helm-charts/ksml/templates/NOTES.txt index 9cf433c5..9c78ad43 100644 --- a/packaging/helm-charts/ksml/templates/NOTES.txt +++ b/packaging/helm-charts/ksml/templates/NOTES.txt @@ -1 +1,5 @@ -Thank you for installing {{ .Chart.Name }}-{{ .Chart.Version }} \ No newline at end of file +Thank you for installing {{ .Chart.Name }}-{{ .Chart.Version }} +{{ if and .Values.serviceMonitor.enabled (not .Values.prometheus.enabled) }} +The service monitor has been disabled because Prometheus is not enabled. +Use prometheus.enabled: true to enable the service monitor +{{- end }} diff --git a/packaging/helm-charts/ksml/templates/service.yaml b/packaging/helm-charts/ksml/templates/service.yaml index c0d20f54..53e98b30 100644 --- a/packaging/helm-charts/ksml/templates/service.yaml +++ b/packaging/helm-charts/ksml/templates/service.yaml @@ -11,9 +11,11 @@ spec: targetPort: http protocol: TCP name: http + {{- if .Values.prometheus.enabled }} - port: {{ .Values.prometheus.port }} targetPort: metrics protocol: TCP name: metrics + {{- end }} selector: {{- include "ksml.selectorLabels" . | nindent 4 }} diff --git a/packaging/helm-charts/ksml/templates/servicemonitor.yaml b/packaging/helm-charts/ksml/templates/servicemonitor.yaml index 7baefd10..e7f03ead 100644 --- a/packaging/helm-charts/ksml/templates/servicemonitor.yaml +++ b/packaging/helm-charts/ksml/templates/servicemonitor.yaml @@ -1,4 +1,4 @@ -{{- if and ( .Capabilities.APIVersions.Has "monitoring.coreos.com/v1" ) .Values.serviceMonitor.enabled }} +{{- if and ( .Capabilities.APIVersions.Has "monitoring.coreos.com/v1" ) .Values.prometheus.enabled .Values.serviceMonitor.enabled }} apiVersion: monitoring.coreos.com/v1 kind: ServiceMonitor metadata: diff --git a/packaging/helm-charts/ksml/templates/statefulset.yaml b/packaging/helm-charts/ksml/templates/statefulset.yaml index 3848c3a0..dd19826e 100644 --- a/packaging/helm-charts/ksml/templates/statefulset.yaml +++ b/packaging/helm-charts/ksml/templates/statefulset.yaml @@ -42,9 +42,11 @@ spec: - name: http containerPort: {{ .Values.applicationServer.port }} protocol: TCP + {{- if .Values.prometheus.enabled }} - name: metrics containerPort: {{ .Values.prometheus.port }} protocol: TCP + {{- end }} {{- if .Values.startupProbe }} startupProbe: {{- toYaml .Values.startupProbe | nindent 12 }} diff --git a/packaging/helm-charts/ksml/values.yaml b/packaging/helm-charts/ksml/values.yaml index 0de74065..edc9c559 100644 --- a/packaging/helm-charts/ksml/values.yaml +++ b/packaging/helm-charts/ksml/values.yaml @@ -313,8 +313,8 @@ prometheus: serviceMonitor: # -- Enables creation of Prometheus Operator [ServiceMonitor](https://prometheus-operator.dev/docs/operator/api/#monitoring.coreos.com/v1.ServiceMonitor). - # Ignored if API `monitoring.coreos.com/v1` is not available. - enabled: true + # Ignored if API `monitoring.coreos.com/v1` is not available or if prometheus is disabled. + enabled: false # -- Interval at which metrics should be scraped. interval: 30s # -- Timeout after which the scrape is ended. From deb39b389a06dd8c478c556547bc3f8856e58a05 Mon Sep 17 00:00:00 2001 From: Jeroen van Disseldorp Date: Tue, 17 Sep 2024 18:24:51 +0200 Subject: [PATCH 2/2] Upgrade to Kafka 3.8.0 (#151) --- ksml-kafka-clients/NOTICE.txt | 12 +++---- .../ksml/client/admin/ResolvingAdmin.java | 35 ++++++++++--------- .../client/consumer/ForwardingConsumer.java | 10 +++--- .../client/producer/ForwardingProducer.java | 12 ++++--- .../client/producer/ResolvingProducer.java | 2 +- ksml/NOTICE.txt | 6 ++-- pom.xml | 2 +- 7 files changed, 43 insertions(+), 36 deletions(-) diff --git a/ksml-kafka-clients/NOTICE.txt b/ksml-kafka-clients/NOTICE.txt index 9776d03c..3c4a6dae 100644 --- a/ksml-kafka-clients/NOTICE.txt +++ b/ksml-kafka-clients/NOTICE.txt @@ -1,9 +1,9 @@ Lists of 20 third-party dependencies. - (The Apache Software License, Version 2.0) Jackson-annotations (com.fasterxml.jackson.core:jackson-annotations:2.13.5 - http://github.com/FasterXML/jackson) - (The Apache Software License, Version 2.0) Jackson-core (com.fasterxml.jackson.core:jackson-core:2.13.5 - https://github.com/FasterXML/jackson-core) - (The Apache Software License, Version 2.0) jackson-databind (com.fasterxml.jackson.core:jackson-databind:2.13.5 - http://github.com/FasterXML/jackson) - (BSD 2-Clause License) zstd-jni (com.github.luben:zstd-jni:1.5.5-1 - https://github.com/luben/zstd-jni) + (The Apache Software License, Version 2.0) Jackson-annotations (com.fasterxml.jackson.core:jackson-annotations:2.16.2 - https://github.com/FasterXML/jackson) + (The Apache Software License, Version 2.0) Jackson-core (com.fasterxml.jackson.core:jackson-core:2.16.2 - https://github.com/FasterXML/jackson-core) + (The Apache Software License, Version 2.0) jackson-databind (com.fasterxml.jackson.core:jackson-databind:2.16.2 - https://github.com/FasterXML/jackson) + (BSD 2-Clause License) zstd-jni (com.github.luben:zstd-jni:1.5.6-3 - https://github.com/luben/zstd-jni) (The Apache Software License, Version 2.0) FindBugs-jsr305 (com.google.code.findbugs:jsr305:3.0.2 - http://findbugs.sourceforge.net/) (Apache 2.0) error-prone annotations (com.google.errorprone:error_prone_annotations:2.26.1 - https://errorprone.info/error_prone_annotations) (The Apache Software License, Version 2.0) Guava InternalFutureFailureAccess and InternalFutures (com.google.guava:failureaccess:1.0.2 - https://github.com/google/guava/failureaccess) @@ -12,8 +12,8 @@ Lists of 20 third-party dependencies. (Apache License, Version 2.0) J2ObjC Annotations (com.google.j2objc:j2objc-annotations:3.0.0 - https://github.com/google/j2objc/) (Apache-2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.14.0 - https://commons.apache.org/proper/commons-lang/) (Apache-2.0) Apache Commons Text (org.apache.commons:commons-text:1.12.0 - https://commons.apache.org/proper/commons-text) - (The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-clients:3.6.2 - https://kafka.apache.org) - (The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-streams:3.6.2 - https://kafka.apache.org) + (The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-clients:3.8.0 - https://kafka.apache.org) + (The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-streams:3.8.0 - https://kafka.apache.org) (The MIT License) Checker Qual (org.checkerframework:checker-qual:3.42.0 - https://checkerframework.org/) (The Apache Software License, Version 2.0) LZ4 and xxHash (org.lz4:lz4-java:1.8.0 - https://github.com/lz4/lz4-java) (The MIT License) Project Lombok (org.projectlombok:lombok:1.18.32 - https://projectlombok.org) diff --git a/ksml-kafka-clients/src/main/java/io/axual/ksml/client/admin/ResolvingAdmin.java b/ksml-kafka-clients/src/main/java/io/axual/ksml/client/admin/ResolvingAdmin.java index 57be8e6f..e7739e86 100644 --- a/ksml-kafka-clients/src/main/java/io/axual/ksml/client/admin/ResolvingAdmin.java +++ b/ksml-kafka-clients/src/main/java/io/axual/ksml/client/admin/ResolvingAdmin.java @@ -42,7 +42,7 @@ public class ResolvingAdmin extends ForwardingAdmin { public ResolvingAdmin(Map configs) { super(configs); - var config = new ResolvingClientConfig(configs); + final var config = new ResolvingClientConfig(configs); topicResolver = config.topicResolver; groupResolver = config.groupResolver; } @@ -58,7 +58,7 @@ public CreateTopicsResult createTopics(Collection newTopics, CreateTop @Override public DeleteTopicsResult deleteTopics(TopicCollection topicCollection, DeleteTopicsOptions options) { - var result = super.deleteTopics(topicResolver.resolve(topicCollection), options); + final var result = super.deleteTopics(topicResolver.resolve(topicCollection), options); return new ResolvingDeleteTopicsResult(result.topicIdValues(), result.topicNameValues(), topicResolver); } @@ -69,13 +69,13 @@ public ListTopicsResult listTopics(ListTopicsOptions options) { @Override public DescribeTopicsResult describeTopics(TopicCollection topicCollection, DescribeTopicsOptions options) { - var result = super.describeTopics(topicResolver.resolve(topicCollection), options); + final var result = super.describeTopics(topicResolver.resolve(topicCollection), options); return new ResolvingDescribeTopicsResult(result.topicIdValues(), result.topicNameValues(), topicResolver); } @Override public DescribeAclsResult describeAcls(AclBindingFilter filter, DescribeAclsOptions options) { - var result = super.describeAcls(ResolverUtil.resolve(filter, topicResolver, groupResolver), options); + final var result = super.describeAcls(ResolverUtil.resolve(filter, topicResolver, groupResolver), options); if (result == null) return null; return new ResolvingDescribeAclsResult(result.values(), topicResolver, groupResolver); } @@ -156,28 +156,31 @@ public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions opt @Override public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map groupSpecs, ListConsumerGroupOffsetsOptions options) { // Resolve the groupSpecs - var newGroupSpecs = new HashMap(); + final var newGroupSpecs = new HashMap(); groupSpecs.forEach((groupId, spec) -> newGroupSpecs.put(groupResolver.resolve(groupId), new ListConsumerGroupOffsetsSpec().topicPartitions(topicResolver.resolveTopicPartitions(spec.topicPartitions())))); // Resolve the options if (options != null) { - var newOptions = new ListConsumerGroupOffsetsOptions().requireStable(options.requireStable()); - if (options.topicPartitions() != null) { - var newTopicPartitions = new ArrayList(options.topicPartitions().size()); - options.topicPartitions().forEach(tp -> newTopicPartitions.add(topicResolver.resolve(tp))); + final var newOptions = new ListConsumerGroupOffsetsOptions().requireStable(options.requireStable()); + final var topicPartitions = options.topicPartitions(); + if (topicPartitions != null) { + final var newTopicPartitions = new ArrayList(topicPartitions.size()); + topicPartitions.forEach(tp -> newTopicPartitions.add(topicResolver.resolve(tp))); newOptions.topicPartitions(newTopicPartitions); } options = newOptions; } // Call the original API - var result = super.listConsumerGroupOffsets(newGroupSpecs, options); - // Convert the result to an unresolving result - var newResult = new HashMap>>(); + final var result = super.listConsumerGroupOffsets(newGroupSpecs, options); + + // Convert the result to an unresolved result + final var newResult = new HashMap>>(); newGroupSpecs.keySet().forEach(groupId -> { - var future = result.partitionsToOffsetAndMetadata(groupId); + final var future = result.partitionsToOffsetAndMetadata(groupId); newResult.put(CoordinatorKey.byGroupId(groupId), future); }); + return new ResolvingListConsumerGroupOffsetsResult(newResult, topicResolver, groupResolver); } @@ -262,9 +265,9 @@ public AlterClientQuotasResult alterClientQuotas(Collection resolveNewTopics(Collection newTopics) { // Resolve all new topics into a new collection - var resolvedTopics = new ArrayList(); - for (var newTopic : newTopics) { - var resolvedTopic = newTopic.replicasAssignments() == null + final var resolvedTopics = new ArrayList(); + for (final var newTopic : newTopics) { + final var resolvedTopic = newTopic.replicasAssignments() == null ? new NewTopic(topicResolver.resolve(newTopic.name()), newTopic.numPartitions(), newTopic.replicationFactor()) : new NewTopic(topicResolver.resolve(newTopic.name()), newTopic.replicasAssignments()); // Make sure that the config is added properly. Cleanup properties and timestamps are typical properties set in Streams diff --git a/ksml-kafka-clients/src/main/java/io/axual/ksml/client/consumer/ForwardingConsumer.java b/ksml-kafka-clients/src/main/java/io/axual/ksml/client/consumer/ForwardingConsumer.java index d6c7faa0..c37a5d23 100644 --- a/ksml-kafka-clients/src/main/java/io/axual/ksml/client/consumer/ForwardingConsumer.java +++ b/ksml-kafka-clients/src/main/java/io/axual/ksml/client/consumer/ForwardingConsumer.java @@ -21,10 +21,7 @@ */ import org.apache.kafka.clients.consumer.*; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.*; import java.time.Duration; import java.util.*; @@ -181,6 +178,11 @@ public Map committed(Set part return delegate.committed(partitions, timeout); } + @Override + public Uuid clientInstanceId(Duration duration) { + return delegate.clientInstanceId(duration); + } + @Override public Map metrics() { return delegate.metrics(); diff --git a/ksml-kafka-clients/src/main/java/io/axual/ksml/client/producer/ForwardingProducer.java b/ksml-kafka-clients/src/main/java/io/axual/ksml/client/producer/ForwardingProducer.java index a9eebe27..b23133bc 100644 --- a/ksml-kafka-clients/src/main/java/io/axual/ksml/client/producer/ForwardingProducer.java +++ b/ksml-kafka-clients/src/main/java/io/axual/ksml/client/producer/ForwardingProducer.java @@ -4,7 +4,7 @@ * ========================LICENSE_START================================= * Extended Kafka clients for KSML * %% - * Copyright (C) 2021 - 2023 Axual B.V. + * Copyright (C) 2021 - 2024 Axual B.V. * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,10 +26,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.*; import org.apache.kafka.common.errors.ProducerFencedException; import java.time.Duration; @@ -106,6 +103,11 @@ public List partitionsFor(String topic) { return delegate.metrics(); } + @Override + public Uuid clientInstanceId(Duration duration) { + return delegate.clientInstanceId(duration); + } + @Override public void close() { delegate.close(); diff --git a/ksml-kafka-clients/src/main/java/io/axual/ksml/client/producer/ResolvingProducer.java b/ksml-kafka-clients/src/main/java/io/axual/ksml/client/producer/ResolvingProducer.java index 817780ad..7c6716a9 100644 --- a/ksml-kafka-clients/src/main/java/io/axual/ksml/client/producer/ResolvingProducer.java +++ b/ksml-kafka-clients/src/main/java/io/axual/ksml/client/producer/ResolvingProducer.java @@ -54,11 +54,11 @@ private static RecordMetadata convertRecordMetadata(RecordMetadata input, String input.offset(), 0, input.timestamp(), - null, input.serializedKeySize(), input.serializedValueSize()); } + @Deprecated @Override public void sendOffsetsToTransaction(Map offsets, String consumerGroupId) throws ProducerFencedException { Map newOffsets = new HashMap<>(); diff --git a/ksml/NOTICE.txt b/ksml/NOTICE.txt index 885f149f..ae2728d2 100644 --- a/ksml/NOTICE.txt +++ b/ksml/NOTICE.txt @@ -8,7 +8,7 @@ Lists of 72 third-party dependencies. (The Apache Software License, Version 2.0) Jackson-dataformat-XML (com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.17.1 - https://github.com/FasterXML/jackson-dataformat-xml) (The Apache Software License, Version 2.0) Jackson-dataformat-YAML (com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.17.1 - https://github.com/FasterXML/jackson-dataformats-text) (The Apache License, Version 2.0) Woodstox (com.fasterxml.woodstox:woodstox-core:6.6.2 - https://github.com/FasterXML/woodstox) - (BSD 2-Clause License) zstd-jni (com.github.luben:zstd-jni:1.5.5-1 - https://github.com/luben/zstd-jni) + (BSD 2-Clause License) zstd-jni (com.github.luben:zstd-jni:1.5.6-3 - https://github.com/luben/zstd-jni) (The Apache Software License, Version 2.0) FindBugs-jsr305 (com.google.code.findbugs:jsr305:3.0.2 - http://findbugs.sourceforge.net/) (Apache 2.0) error-prone annotations (com.google.errorprone:error_prone_annotations:2.26.1 - https://errorprone.info/error_prone_annotations) (The Apache Software License, Version 2.0) Guava InternalFutureFailureAccess and InternalFutures (com.google.guava:failureaccess:1.0.2 - https://github.com/google/guava/failureaccess) @@ -39,8 +39,8 @@ Lists of 72 third-party dependencies. (Apache-2.0) Apache Avro (org.apache.avro:avro:1.11.3 - https://avro.apache.org) (Apache-2.0) Apache Commons Compress (org.apache.commons:commons-compress:1.26.2 - https://commons.apache.org/proper/commons-compress/) (Apache-2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.14.0 - https://commons.apache.org/proper/commons-lang/) - (The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-clients:3.6.2 - https://kafka.apache.org) - (The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-streams:3.6.2 - https://kafka.apache.org) + (The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-clients:3.8.0 - https://kafka.apache.org) + (The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-streams:3.8.0 - https://kafka.apache.org) (Bouncy Castle Licence) Bouncy Castle PKIX, CMS, EAC, TSP, PKCS, OCSP, CMP, and CRMF APIs (org.bouncycastle:bcpkix-jdk18on:1.76 - https://www.bouncycastle.org/java.html) (Bouncy Castle Licence) Bouncy Castle Provider (org.bouncycastle:bcprov-jdk18on:1.76 - https://www.bouncycastle.org/java.html) (Bouncy Castle Licence) Bouncy Castle ASN.1 Extension and Utility APIs (org.bouncycastle:bcutil-jdk18on:1.76 - https://www.bouncycastle.org/java.html) diff --git a/pom.xml b/pom.xml index 6f3223ea..a05465a0 100644 --- a/pom.xml +++ b/pom.xml @@ -64,7 +64,7 @@ 1.11.3 1.12.0 1.26.2 - 3.6.2 + 3.8.0 7.6.1 23.1.2 23.1.2