From 7e4e4bd9b8089f7bae7d68eb3a381790d48f44ca Mon Sep 17 00:00:00 2001 From: jeroenvandisseldorp Date: Fri, 30 Aug 2024 15:14:13 +0200 Subject: [PATCH 1/3] Upgrade to Kafka 3.8.0 --- ksml-kafka-clients/NOTICE.txt | 12 +++---- .../ksml/client/admin/ResolvingAdmin.java | 35 ++++++++++--------- .../client/consumer/ForwardingConsumer.java | 10 +++--- .../client/producer/ForwardingProducer.java | 12 ++++--- .../client/producer/ResolvingProducer.java | 2 +- ksml/NOTICE.txt | 6 ++-- pom.xml | 2 +- 7 files changed, 43 insertions(+), 36 deletions(-) diff --git a/ksml-kafka-clients/NOTICE.txt b/ksml-kafka-clients/NOTICE.txt index 9776d03c..3c4a6dae 100644 --- a/ksml-kafka-clients/NOTICE.txt +++ b/ksml-kafka-clients/NOTICE.txt @@ -1,9 +1,9 @@ Lists of 20 third-party dependencies. - (The Apache Software License, Version 2.0) Jackson-annotations (com.fasterxml.jackson.core:jackson-annotations:2.13.5 - http://github.com/FasterXML/jackson) - (The Apache Software License, Version 2.0) Jackson-core (com.fasterxml.jackson.core:jackson-core:2.13.5 - https://github.com/FasterXML/jackson-core) - (The Apache Software License, Version 2.0) jackson-databind (com.fasterxml.jackson.core:jackson-databind:2.13.5 - http://github.com/FasterXML/jackson) - (BSD 2-Clause License) zstd-jni (com.github.luben:zstd-jni:1.5.5-1 - https://github.com/luben/zstd-jni) + (The Apache Software License, Version 2.0) Jackson-annotations (com.fasterxml.jackson.core:jackson-annotations:2.16.2 - https://github.com/FasterXML/jackson) + (The Apache Software License, Version 2.0) Jackson-core (com.fasterxml.jackson.core:jackson-core:2.16.2 - https://github.com/FasterXML/jackson-core) + (The Apache Software License, Version 2.0) jackson-databind (com.fasterxml.jackson.core:jackson-databind:2.16.2 - https://github.com/FasterXML/jackson) + (BSD 2-Clause License) zstd-jni (com.github.luben:zstd-jni:1.5.6-3 - https://github.com/luben/zstd-jni) (The Apache Software License, Version 2.0) FindBugs-jsr305 (com.google.code.findbugs:jsr305:3.0.2 - http://findbugs.sourceforge.net/) (Apache 2.0) error-prone annotations (com.google.errorprone:error_prone_annotations:2.26.1 - https://errorprone.info/error_prone_annotations) (The Apache Software License, Version 2.0) Guava InternalFutureFailureAccess and InternalFutures (com.google.guava:failureaccess:1.0.2 - https://github.com/google/guava/failureaccess) @@ -12,8 +12,8 @@ Lists of 20 third-party dependencies. (Apache License, Version 2.0) J2ObjC Annotations (com.google.j2objc:j2objc-annotations:3.0.0 - https://github.com/google/j2objc/) (Apache-2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.14.0 - https://commons.apache.org/proper/commons-lang/) (Apache-2.0) Apache Commons Text (org.apache.commons:commons-text:1.12.0 - https://commons.apache.org/proper/commons-text) - (The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-clients:3.6.2 - https://kafka.apache.org) - (The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-streams:3.6.2 - https://kafka.apache.org) + (The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-clients:3.8.0 - https://kafka.apache.org) + (The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-streams:3.8.0 - https://kafka.apache.org) (The MIT License) Checker Qual (org.checkerframework:checker-qual:3.42.0 - https://checkerframework.org/) (The Apache Software License, Version 2.0) LZ4 and xxHash (org.lz4:lz4-java:1.8.0 - https://github.com/lz4/lz4-java) (The MIT License) Project Lombok (org.projectlombok:lombok:1.18.32 - https://projectlombok.org) diff --git a/ksml-kafka-clients/src/main/java/io/axual/ksml/client/admin/ResolvingAdmin.java b/ksml-kafka-clients/src/main/java/io/axual/ksml/client/admin/ResolvingAdmin.java index 57be8e6f..e7739e86 100644 --- a/ksml-kafka-clients/src/main/java/io/axual/ksml/client/admin/ResolvingAdmin.java +++ b/ksml-kafka-clients/src/main/java/io/axual/ksml/client/admin/ResolvingAdmin.java @@ -42,7 +42,7 @@ public class ResolvingAdmin extends ForwardingAdmin { public ResolvingAdmin(Map configs) { super(configs); - var config = new ResolvingClientConfig(configs); + final var config = new ResolvingClientConfig(configs); topicResolver = config.topicResolver; groupResolver = config.groupResolver; } @@ -58,7 +58,7 @@ public CreateTopicsResult createTopics(Collection newTopics, CreateTop @Override public DeleteTopicsResult deleteTopics(TopicCollection topicCollection, DeleteTopicsOptions options) { - var result = super.deleteTopics(topicResolver.resolve(topicCollection), options); + final var result = super.deleteTopics(topicResolver.resolve(topicCollection), options); return new ResolvingDeleteTopicsResult(result.topicIdValues(), result.topicNameValues(), topicResolver); } @@ -69,13 +69,13 @@ public ListTopicsResult listTopics(ListTopicsOptions options) { @Override public DescribeTopicsResult describeTopics(TopicCollection topicCollection, DescribeTopicsOptions options) { - var result = super.describeTopics(topicResolver.resolve(topicCollection), options); + final var result = super.describeTopics(topicResolver.resolve(topicCollection), options); return new ResolvingDescribeTopicsResult(result.topicIdValues(), result.topicNameValues(), topicResolver); } @Override public DescribeAclsResult describeAcls(AclBindingFilter filter, DescribeAclsOptions options) { - var result = super.describeAcls(ResolverUtil.resolve(filter, topicResolver, groupResolver), options); + final var result = super.describeAcls(ResolverUtil.resolve(filter, topicResolver, groupResolver), options); if (result == null) return null; return new ResolvingDescribeAclsResult(result.values(), topicResolver, groupResolver); } @@ -156,28 +156,31 @@ public ListConsumerGroupsResult listConsumerGroups(ListConsumerGroupsOptions opt @Override public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map groupSpecs, ListConsumerGroupOffsetsOptions options) { // Resolve the groupSpecs - var newGroupSpecs = new HashMap(); + final var newGroupSpecs = new HashMap(); groupSpecs.forEach((groupId, spec) -> newGroupSpecs.put(groupResolver.resolve(groupId), new ListConsumerGroupOffsetsSpec().topicPartitions(topicResolver.resolveTopicPartitions(spec.topicPartitions())))); // Resolve the options if (options != null) { - var newOptions = new ListConsumerGroupOffsetsOptions().requireStable(options.requireStable()); - if (options.topicPartitions() != null) { - var newTopicPartitions = new ArrayList(options.topicPartitions().size()); - options.topicPartitions().forEach(tp -> newTopicPartitions.add(topicResolver.resolve(tp))); + final var newOptions = new ListConsumerGroupOffsetsOptions().requireStable(options.requireStable()); + final var topicPartitions = options.topicPartitions(); + if (topicPartitions != null) { + final var newTopicPartitions = new ArrayList(topicPartitions.size()); + topicPartitions.forEach(tp -> newTopicPartitions.add(topicResolver.resolve(tp))); newOptions.topicPartitions(newTopicPartitions); } options = newOptions; } // Call the original API - var result = super.listConsumerGroupOffsets(newGroupSpecs, options); - // Convert the result to an unresolving result - var newResult = new HashMap>>(); + final var result = super.listConsumerGroupOffsets(newGroupSpecs, options); + + // Convert the result to an unresolved result + final var newResult = new HashMap>>(); newGroupSpecs.keySet().forEach(groupId -> { - var future = result.partitionsToOffsetAndMetadata(groupId); + final var future = result.partitionsToOffsetAndMetadata(groupId); newResult.put(CoordinatorKey.byGroupId(groupId), future); }); + return new ResolvingListConsumerGroupOffsetsResult(newResult, topicResolver, groupResolver); } @@ -262,9 +265,9 @@ public AlterClientQuotasResult alterClientQuotas(Collection resolveNewTopics(Collection newTopics) { // Resolve all new topics into a new collection - var resolvedTopics = new ArrayList(); - for (var newTopic : newTopics) { - var resolvedTopic = newTopic.replicasAssignments() == null + final var resolvedTopics = new ArrayList(); + for (final var newTopic : newTopics) { + final var resolvedTopic = newTopic.replicasAssignments() == null ? new NewTopic(topicResolver.resolve(newTopic.name()), newTopic.numPartitions(), newTopic.replicationFactor()) : new NewTopic(topicResolver.resolve(newTopic.name()), newTopic.replicasAssignments()); // Make sure that the config is added properly. Cleanup properties and timestamps are typical properties set in Streams diff --git a/ksml-kafka-clients/src/main/java/io/axual/ksml/client/consumer/ForwardingConsumer.java b/ksml-kafka-clients/src/main/java/io/axual/ksml/client/consumer/ForwardingConsumer.java index d6c7faa0..c37a5d23 100644 --- a/ksml-kafka-clients/src/main/java/io/axual/ksml/client/consumer/ForwardingConsumer.java +++ b/ksml-kafka-clients/src/main/java/io/axual/ksml/client/consumer/ForwardingConsumer.java @@ -21,10 +21,7 @@ */ import org.apache.kafka.clients.consumer.*; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.*; import java.time.Duration; import java.util.*; @@ -181,6 +178,11 @@ public Map committed(Set part return delegate.committed(partitions, timeout); } + @Override + public Uuid clientInstanceId(Duration duration) { + return delegate.clientInstanceId(duration); + } + @Override public Map metrics() { return delegate.metrics(); diff --git a/ksml-kafka-clients/src/main/java/io/axual/ksml/client/producer/ForwardingProducer.java b/ksml-kafka-clients/src/main/java/io/axual/ksml/client/producer/ForwardingProducer.java index a9eebe27..b23133bc 100644 --- a/ksml-kafka-clients/src/main/java/io/axual/ksml/client/producer/ForwardingProducer.java +++ b/ksml-kafka-clients/src/main/java/io/axual/ksml/client/producer/ForwardingProducer.java @@ -4,7 +4,7 @@ * ========================LICENSE_START================================= * Extended Kafka clients for KSML * %% - * Copyright (C) 2021 - 2023 Axual B.V. + * Copyright (C) 2021 - 2024 Axual B.V. * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -26,10 +26,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.Metric; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.*; import org.apache.kafka.common.errors.ProducerFencedException; import java.time.Duration; @@ -106,6 +103,11 @@ public List partitionsFor(String topic) { return delegate.metrics(); } + @Override + public Uuid clientInstanceId(Duration duration) { + return delegate.clientInstanceId(duration); + } + @Override public void close() { delegate.close(); diff --git a/ksml-kafka-clients/src/main/java/io/axual/ksml/client/producer/ResolvingProducer.java b/ksml-kafka-clients/src/main/java/io/axual/ksml/client/producer/ResolvingProducer.java index 817780ad..7c6716a9 100644 --- a/ksml-kafka-clients/src/main/java/io/axual/ksml/client/producer/ResolvingProducer.java +++ b/ksml-kafka-clients/src/main/java/io/axual/ksml/client/producer/ResolvingProducer.java @@ -54,11 +54,11 @@ private static RecordMetadata convertRecordMetadata(RecordMetadata input, String input.offset(), 0, input.timestamp(), - null, input.serializedKeySize(), input.serializedValueSize()); } + @Deprecated @Override public void sendOffsetsToTransaction(Map offsets, String consumerGroupId) throws ProducerFencedException { Map newOffsets = new HashMap<>(); diff --git a/ksml/NOTICE.txt b/ksml/NOTICE.txt index 885f149f..ae2728d2 100644 --- a/ksml/NOTICE.txt +++ b/ksml/NOTICE.txt @@ -8,7 +8,7 @@ Lists of 72 third-party dependencies. (The Apache Software License, Version 2.0) Jackson-dataformat-XML (com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.17.1 - https://github.com/FasterXML/jackson-dataformat-xml) (The Apache Software License, Version 2.0) Jackson-dataformat-YAML (com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:2.17.1 - https://github.com/FasterXML/jackson-dataformats-text) (The Apache License, Version 2.0) Woodstox (com.fasterxml.woodstox:woodstox-core:6.6.2 - https://github.com/FasterXML/woodstox) - (BSD 2-Clause License) zstd-jni (com.github.luben:zstd-jni:1.5.5-1 - https://github.com/luben/zstd-jni) + (BSD 2-Clause License) zstd-jni (com.github.luben:zstd-jni:1.5.6-3 - https://github.com/luben/zstd-jni) (The Apache Software License, Version 2.0) FindBugs-jsr305 (com.google.code.findbugs:jsr305:3.0.2 - http://findbugs.sourceforge.net/) (Apache 2.0) error-prone annotations (com.google.errorprone:error_prone_annotations:2.26.1 - https://errorprone.info/error_prone_annotations) (The Apache Software License, Version 2.0) Guava InternalFutureFailureAccess and InternalFutures (com.google.guava:failureaccess:1.0.2 - https://github.com/google/guava/failureaccess) @@ -39,8 +39,8 @@ Lists of 72 third-party dependencies. (Apache-2.0) Apache Avro (org.apache.avro:avro:1.11.3 - https://avro.apache.org) (Apache-2.0) Apache Commons Compress (org.apache.commons:commons-compress:1.26.2 - https://commons.apache.org/proper/commons-compress/) (Apache-2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.14.0 - https://commons.apache.org/proper/commons-lang/) - (The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-clients:3.6.2 - https://kafka.apache.org) - (The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-streams:3.6.2 - https://kafka.apache.org) + (The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-clients:3.8.0 - https://kafka.apache.org) + (The Apache License, Version 2.0) Apache Kafka (org.apache.kafka:kafka-streams:3.8.0 - https://kafka.apache.org) (Bouncy Castle Licence) Bouncy Castle PKIX, CMS, EAC, TSP, PKCS, OCSP, CMP, and CRMF APIs (org.bouncycastle:bcpkix-jdk18on:1.76 - https://www.bouncycastle.org/java.html) (Bouncy Castle Licence) Bouncy Castle Provider (org.bouncycastle:bcprov-jdk18on:1.76 - https://www.bouncycastle.org/java.html) (Bouncy Castle Licence) Bouncy Castle ASN.1 Extension and Utility APIs (org.bouncycastle:bcutil-jdk18on:1.76 - https://www.bouncycastle.org/java.html) diff --git a/pom.xml b/pom.xml index 6f3223ea..a05465a0 100644 --- a/pom.xml +++ b/pom.xml @@ -64,7 +64,7 @@ 1.11.3 1.12.0 1.26.2 - 3.6.2 + 3.8.0 7.6.1 23.1.2 23.1.2 From f853d25787115d4915e4aa4b6fdffd651fae5690 Mon Sep 17 00:00:00 2001 From: jeroenvandisseldorp Date: Mon, 23 Sep 2024 08:33:59 +0200 Subject: [PATCH 2/3] Update byte manipulation example - Generate example data for byte manipulation example - Have byte manipulation example read from alternative topic --- examples/00-example-generate-sensordata.yaml | 10 ++++++++++ examples/12-example-byte-manipulation.yaml | 10 +++++----- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/examples/00-example-generate-sensordata.yaml b/examples/00-example-generate-sensordata.yaml index c21822a1..46b73278 100644 --- a/examples/00-example-generate-sensordata.yaml +++ b/examples/00-example-generate-sensordata.yaml @@ -44,3 +44,13 @@ producers: topic: ksml_sensordata_avro keyType: string valueType: avro:SensorData + + # The following producer generates messages for 12-example-byte-manipulation.yaml. Enable this producer if you + # want to run that specific example. + sensordata_avro_producer_binary: + generator: generate_sensordata_message + interval: 3s + to: + topic: ksml_sensordata_avro_binary + keyType: string + valueType: avro:SensorData diff --git a/examples/12-example-byte-manipulation.yaml b/examples/12-example-byte-manipulation.yaml index efa7c011..9973b6fa 100644 --- a/examples/12-example-byte-manipulation.yaml +++ b/examples/12-example-byte-manipulation.yaml @@ -8,12 +8,12 @@ # message contains a schema id that is not locally recognized. By changing bytes 1-4 in the value, # one can override the schema id for further downstream processing. -# Yes, this is hacky, but it may serve a purpose for cases where binary copies are made from -# remote Kafka clusters that contain conflicting schema ids. +# Yes, this is hacky, but it may serve a purpose for cases where binary message copies are made from +# remote Kafka clusters with their own (possibly conflicting) schema ids. streams: - sensor_source: - topic: ksml_sensordata_avro + sensor_binary_source: + topic: ksml_sensordata_avro_binary keyType: string valueType: bytes offsetResetPolicy: latest @@ -44,7 +44,7 @@ functions: pipelines: main: - from: sensor_source + from: sensor_binary_source via: - type: peek forEach: From 5549d326c78affec293f84eeaf86b76dc778516d Mon Sep 17 00:00:00 2001 From: jeroenvandisseldorp Date: Tue, 24 Sep 2024 18:05:41 +0200 Subject: [PATCH 3/3] Improve example 12 - Allow example 12 to run in parallel to other examples by consuming from a separate topic - Improve on the Python mapping for Java arrays by forcing a conversion to Python lists --- examples/12-example-byte-manipulation.yaml | 2 +- .../io/axual/ksml/python/PythonDataObjectMapper.java | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/examples/12-example-byte-manipulation.yaml b/examples/12-example-byte-manipulation.yaml index 9973b6fa..7bad4b34 100644 --- a/examples/12-example-byte-manipulation.yaml +++ b/examples/12-example-byte-manipulation.yaml @@ -30,7 +30,7 @@ functions: code: | global newSchemaId log.info("Replacing schema in message value: {}", value) - if value is not None: + if isinstance(value, list): if value[0] == 0 and len(value) >= 5: value[1] = (newSchemaId & 0xff000000) >> 24 value[2] = (newSchemaId & 0xff0000) >> 16 diff --git a/ksml/src/main/java/io/axual/ksml/python/PythonDataObjectMapper.java b/ksml/src/main/java/io/axual/ksml/python/PythonDataObjectMapper.java index fca42e17..6fd5c830 100644 --- a/ksml/src/main/java/io/axual/ksml/python/PythonDataObjectMapper.java +++ b/ksml/src/main/java/io/axual/ksml/python/PythonDataObjectMapper.java @@ -28,6 +28,7 @@ import io.axual.ksml.util.ExecutionUtil; import org.graalvm.polyglot.Value; +import java.util.ArrayList; import java.util.Map; public class PythonDataObjectMapper extends NativeDataObjectMapper { @@ -146,7 +147,13 @@ public Value fromDataObject(DataObject object) { if (object instanceof DataLong val) return Value.asValue(val.value()); if (object instanceof DataFloat val) return Value.asValue(val.value()); if (object instanceof DataDouble val) return Value.asValue(val.value()); - if (object instanceof DataBytes val) return Value.asValue(val.value()); + if (object instanceof DataBytes val) { + // Convert the contained byte array to a list, so it can be converted to a Python list by the PythonFunction + // wrapper code downstream... + final var bytes = new ArrayList(val.value().length); + for (byte b : val.value()) bytes.add(b); + return Value.asValue(bytes); + } if (object instanceof DataString val) return Value.asValue(val.value()); if (object instanceof DataList val) return Value.asValue(fromDataList(val)); if (object instanceof DataStruct val) return Value.asValue(fromDataStruct(val));