From a9ef6560c9184c8db892c9f76cfadc556bcbece8 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 15 Sep 2025 15:27:29 +0200 Subject: [PATCH 01/13] Add lineage processors --- .../kafka/streams/kstream/KStreamX.java | 4 + .../kafka/streams/kstream/KStreamXImpl.java | 10 +++ .../kafka/streams/kstream/KTableX.java | 4 + .../kafka/streams/kstream/KTableXImpl.java | 10 +++ .../kafka/streams/kstream/LineageHeaders.java | 69 +++++++++++++++ .../streams/kstream/LineageProcessor.java | 59 +++++++++++++ .../streams/kstream/LineageTransformer.java | 52 ++++++++++++ .../kafka/streams/kstream/KStreamXTest.java | 84 ++++++++++++++++++ .../kafka/streams/kstream/KTableXTest.java | 85 +++++++++++++++++++ 9 files changed, 377 insertions(+) create mode 100644 streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageHeaders.java create mode 100644 streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageProcessor.java create mode 100644 streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageTransformer.java diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamX.java index 55f3b0162..50bf392fc 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamX.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamX.java @@ -1011,4 +1011,8 @@ KErrorStreamX processValuesCapturingErrors( FixedKeyProcessorSupplier processorSupplier, java.util.function.Predicate errorFilter, Named named, String... stateStoreNames); + + KStreamX withLineage(); + + KStreamX withLineage(Named named); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamXImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamXImpl.java index 0bdd0b6ac..7a412f30b 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamXImpl.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamXImpl.java @@ -936,6 +936,16 @@ public KErrorStreamX processValuesCapturingErrors( ErrorCapturingValueProcessor.captureErrors(processorSupplier, errorFilter), named, stateStoreNames); } + @Override + public KStreamX withLineage() { + return this.processValues(LineageProcessor::new); + } + + @Override + public KStreamX withLineage(final Named named) { + return this.processValues(LineageProcessor::new, named); + } + private KeyValueKErrorStreamX mapCapturingErrorsInternal( final KeyValueMapper>> mapper) { final KStreamX> map = this.map(mapper); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java index 3a77d5c85..0c4e44b85 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java @@ -429,4 +429,8 @@ KTableX leftJoin(KTable other, BiFunction KTableX leftJoin(KTable other, BiFunction foreignKeyExtractor, ValueJoiner joiner, TableJoined tableJoined, MaterializedX> materialized); + + KTableX withLineage(); + + KTableX withLineage(Named named); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableXImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableXImpl.java index 34b7c250b..790bbf049 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableXImpl.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableXImpl.java @@ -613,6 +613,16 @@ public KTableX leftJoin(final KTable other, materialized.configure(this.context.getConfigurator())); } + @Override + public KTableX withLineage() { + return this.transformValues(LineageTransformer::new); + } + + @Override + public KTableX withLineage(final Named named) { + return this.transformValues(LineageTransformer::new, named); + } + @Override public String queryableStoreName() { return this.wrapped.queryableStoreName(); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageHeaders.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageHeaders.java new file mode 100644 index 000000000..f93554127 --- /dev/null +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageHeaders.java @@ -0,0 +1,69 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka.streams.kstream; + +import java.nio.charset.StandardCharsets; +import lombok.Getter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.experimental.Accessors; +import org.apache.kafka.common.header.Headers; + +@RequiredArgsConstructor +public class LineageHeaders { + private static final String LINEAGE_PREFIX = "lineage."; + public static final String TOPIC_HEADER = LINEAGE_PREFIX + "topic"; + public static final String PARTITION_HEADER = LINEAGE_PREFIX + "partition"; + public static final String OFFSET_HEADER = LINEAGE_PREFIX + "offset"; + + @Getter + @Accessors(fluent = true) + private final @NonNull Headers headers; + + public LineageHeaders addTopicHeader(final String topic) { + if (topic == null) { + return this; + } + return new LineageHeaders(this.headers.add(TOPIC_HEADER, topic.getBytes(StandardCharsets.UTF_8))); + } + + public LineageHeaders addPartitionHeader(final int partition) { + if (partition < 0) { + return this; + } + //TODO serialize more compact as int? But then UI tools usually can't handle it + return new LineageHeaders( + this.headers.add(PARTITION_HEADER, Integer.toString(partition).getBytes(StandardCharsets.UTF_8))); + } + + public LineageHeaders addOffsetHeader(final long offset) { + if (offset < 0) { + return this; + } + //TODO serialize more compact as long? But then UI tools usually can't handle it + return new LineageHeaders( + this.headers.add(OFFSET_HEADER, Long.toString(offset).getBytes(StandardCharsets.UTF_8))); + } +} diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageProcessor.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageProcessor.java new file mode 100644 index 000000000..890b55ed4 --- /dev/null +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageProcessor.java @@ -0,0 +1,59 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka.streams.kstream; + +import java.util.Optional; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.streams.processor.api.FixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; +import org.apache.kafka.streams.processor.api.RecordMetadata; + +class LineageProcessor implements FixedKeyProcessor { + private FixedKeyProcessorContext context; + + private static Headers addHeaders(final Headers headers, final RecordMetadata metadata) { + return new LineageHeaders(new RecordHeaders(headers)) + .addTopicHeader(metadata.topic()) + .addPartitionHeader(metadata.partition()) + .addOffsetHeader(metadata.offset()) + .headers(); + } + + @Override + public void init(final FixedKeyProcessorContext context) { + this.context = context; + } + + @Override + public void process(final FixedKeyRecord record) { + final Optional metadata = this.context.recordMetadata(); + final Headers headers = record.headers(); + final Headers newHeaders = metadata.map(m -> addHeaders(headers, m)) + .orElse(headers); + this.context.forward(record.withHeaders(newHeaders)); + } +} diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageTransformer.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageTransformer.java new file mode 100644 index 000000000..5f15b736d --- /dev/null +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageTransformer.java @@ -0,0 +1,52 @@ +/* + * MIT License + * + * Copyright (c) 2025 bakdata + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.bakdata.kafka.streams.kstream; + +import org.apache.kafka.streams.kstream.ValueTransformerWithKey; +import org.apache.kafka.streams.processor.ProcessorContext; + +class LineageTransformer implements ValueTransformerWithKey { + + private ProcessorContext context; + + @Override + public void init(final ProcessorContext context) { + this.context = context; + } + + @Override + public V transform(final K readOnlyKey, final V value) { + new LineageHeaders(this.context.headers()) + .addTopicHeader(this.context.topic()) + .addPartitionHeader(this.context.partition()) + .addOffsetHeader(this.context.offset()); + return value; + } + + @Override + public void close() { + // do nothing + } +} diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KStreamXTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KStreamXTest.java index c3070a976..a18a57cdb 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KStreamXTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KStreamXTest.java @@ -39,9 +39,11 @@ import com.bakdata.kafka.streams.apps.DoubleApp; import com.bakdata.kafka.streams.apps.StringApp; import java.io.File; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.List; import java.util.Map; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.StreamsException; @@ -3637,4 +3639,86 @@ public void buildTopology(final StreamsBuilderX builder) { } } + @Test + void shouldAddLineage() { + final StringApp app = new StringApp() { + @Override + public void buildTopology(final StreamsBuilderX builder) { + final KStreamX input = builder.stream("input"); + input.withLineage().to("output"); + } + }; + try (final TestTopology topology = app.startApp()) { + topology.input().add("foo", "bar"); + final List> records = topology.streamOutput().toList(); + this.softly.assertThat(records) + .hasSize(1) + .anySatisfy(rekord -> { + this.softly.assertThat(rekord.key()).isEqualTo("foo"); + this.softly.assertThat(rekord.value()).isEqualTo("bar"); + this.softly.assertThat(rekord.headers().toArray()) + .hasSize(3) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.TOPIC_HEADER); + this.softly.assertThat(new String(header.value(), StandardCharsets.UTF_8)) + .isEqualTo("input"); + }) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.PARTITION_HEADER); + this.softly.assertThat( + Integer.parseInt(new String(header.value(), + StandardCharsets.UTF_8))) + .isEqualTo(0); + }) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.OFFSET_HEADER); + this.softly.assertThat( + Long.parseLong(new String(header.value(), StandardCharsets.UTF_8))) + .isEqualTo(0L); + }); + }); + } + } + + @Test + void shouldAddLineageNamed() { + final StringApp app = new StringApp() { + @Override + public void buildTopology(final StreamsBuilderX builder) { + final KStreamX input = builder.stream("input"); + input.withLineage(Named.as("lineage")).to("output"); + } + }; + try (final TestTopology topology = app.startApp()) { + topology.input().add("foo", "bar"); + final List> records = topology.streamOutput().toList(); + this.softly.assertThat(records) + .hasSize(1) + .anySatisfy(rekord -> { + this.softly.assertThat(rekord.key()).isEqualTo("foo"); + this.softly.assertThat(rekord.value()).isEqualTo("bar"); + this.softly.assertThat(rekord.headers().toArray()) + .hasSize(3) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.TOPIC_HEADER); + this.softly.assertThat(new String(header.value(), StandardCharsets.UTF_8)) + .isEqualTo("input"); + }) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.PARTITION_HEADER); + this.softly.assertThat( + Integer.parseInt(new String(header.value(), + StandardCharsets.UTF_8))) + .isEqualTo(0); + }) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.OFFSET_HEADER); + this.softly.assertThat( + Long.parseLong(new String(header.value(), StandardCharsets.UTF_8))) + .isEqualTo(0L); + }); + }); + } + } + } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java index 1cc3aa29c..f76326bb4 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java @@ -33,8 +33,11 @@ import com.bakdata.kafka.streams.StreamsTopicConfig; import com.bakdata.kafka.streams.apps.DoubleApp; import com.bakdata.kafka.streams.apps.StringApp; +import java.nio.charset.StandardCharsets; import java.time.Duration; +import java.util.List; import java.util.function.Function; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; @@ -1840,4 +1843,86 @@ void shouldHaveQueryableStoreName() { this.softly.assertThat(table.queryableStoreName()).isEqualTo("store"); } + @Test + void shouldAddLineage() { + final StringApp app = new StringApp() { + @Override + public void buildTopology(final StreamsBuilderX builder) { + final KTableX input = builder.table("input"); + input.withLineage().toStream().to("output"); + } + }; + try (final TestTopology topology = app.startApp()) { + topology.input().add("foo", "bar"); + final List> records = topology.streamOutput().toList(); + this.softly.assertThat(records) + .hasSize(1) + .anySatisfy(rekord -> { + this.softly.assertThat(rekord.key()).isEqualTo("foo"); + this.softly.assertThat(rekord.value()).isEqualTo("bar"); + this.softly.assertThat(rekord.headers().toArray()) + .hasSize(3) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.TOPIC_HEADER); + this.softly.assertThat(new String(header.value(), StandardCharsets.UTF_8)) + .isEqualTo("input"); + }) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.PARTITION_HEADER); + this.softly.assertThat( + Integer.parseInt(new String(header.value(), + StandardCharsets.UTF_8))) + .isEqualTo(0); + }) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.OFFSET_HEADER); + this.softly.assertThat( + Long.parseLong(new String(header.value(), StandardCharsets.UTF_8))) + .isEqualTo(0L); + }); + }); + } + } + + @Test + void shouldAddLineageNamed() { + final StringApp app = new StringApp() { + @Override + public void buildTopology(final StreamsBuilderX builder) { + final KTableX input = builder.table("input"); + input.withLineage(Named.as("lineage")).toStream().to("output"); + } + }; + try (final TestTopology topology = app.startApp()) { + topology.input().add("foo", "bar"); + final List> records = topology.streamOutput().toList(); + this.softly.assertThat(records) + .hasSize(1) + .anySatisfy(rekord -> { + this.softly.assertThat(rekord.key()).isEqualTo("foo"); + this.softly.assertThat(rekord.value()).isEqualTo("bar"); + this.softly.assertThat(rekord.headers().toArray()) + .hasSize(3) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.TOPIC_HEADER); + this.softly.assertThat(new String(header.value(), StandardCharsets.UTF_8)) + .isEqualTo("input"); + }) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.PARTITION_HEADER); + this.softly.assertThat( + Integer.parseInt(new String(header.value(), + StandardCharsets.UTF_8))) + .isEqualTo(0); + }) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.OFFSET_HEADER); + this.softly.assertThat( + Long.parseLong(new String(header.value(), StandardCharsets.UTF_8))) + .isEqualTo(0L); + }); + }); + } + } + } From c0845eb56c49aeb9c9c9dcc39164e75472375311 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 15 Sep 2025 15:52:04 +0200 Subject: [PATCH 02/13] Clean up --- .../kafka/streams/kstream/KTableX.java | 4 + .../kafka/streams/kstream/LineageHeaders.java | 11 +-- .../streams/kstream/LineageProcessor.java | 6 +- .../kafka/streams/kstream/KTableXTest.java | 86 +++++++++++++++++++ 4 files changed, 99 insertions(+), 8 deletions(-) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java index 0c4e44b85..3213b6332 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java @@ -433,4 +433,8 @@ KTableX leftJoin(KTable other, BiFunction KTableX withLineage(); KTableX withLineage(Named named); + + KTableX withLineage(MaterializedX> materialized); + + KTableX withLineage(MaterializedX> materialized, Named named); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageHeaders.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageHeaders.java index f93554127..99303ce50 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageHeaders.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageHeaders.java @@ -25,31 +25,32 @@ package com.bakdata.kafka.streams.kstream; import java.nio.charset.StandardCharsets; +import lombok.AccessLevel; import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.experimental.Accessors; import org.apache.kafka.common.header.Headers; -@RequiredArgsConstructor +@RequiredArgsConstructor(access = AccessLevel.PACKAGE) public class LineageHeaders { private static final String LINEAGE_PREFIX = "lineage."; public static final String TOPIC_HEADER = LINEAGE_PREFIX + "topic"; public static final String PARTITION_HEADER = LINEAGE_PREFIX + "partition"; public static final String OFFSET_HEADER = LINEAGE_PREFIX + "offset"; - @Getter + @Getter(AccessLevel.PACKAGE) @Accessors(fluent = true) private final @NonNull Headers headers; - public LineageHeaders addTopicHeader(final String topic) { + LineageHeaders addTopicHeader(final String topic) { if (topic == null) { return this; } return new LineageHeaders(this.headers.add(TOPIC_HEADER, topic.getBytes(StandardCharsets.UTF_8))); } - public LineageHeaders addPartitionHeader(final int partition) { + LineageHeaders addPartitionHeader(final int partition) { if (partition < 0) { return this; } @@ -58,7 +59,7 @@ public LineageHeaders addPartitionHeader(final int partition) { this.headers.add(PARTITION_HEADER, Integer.toString(partition).getBytes(StandardCharsets.UTF_8))); } - public LineageHeaders addOffsetHeader(final long offset) { + LineageHeaders addOffsetHeader(final long offset) { if (offset < 0) { return this; } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageProcessor.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageProcessor.java index 890b55ed4..6a57196c9 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageProcessor.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageProcessor.java @@ -49,11 +49,11 @@ public void init(final FixedKeyProcessorContext context) { } @Override - public void process(final FixedKeyRecord record) { + public void process(final FixedKeyRecord rekord) { final Optional metadata = this.context.recordMetadata(); - final Headers headers = record.headers(); + final Headers headers = rekord.headers(); final Headers newHeaders = metadata.map(m -> addHeaders(headers, m)) .orElse(headers); - this.context.forward(record.withHeaders(newHeaders)); + this.context.forward(rekord.withHeaders(newHeaders)); } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java index f76326bb4..aa24ad506 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java @@ -1925,4 +1925,90 @@ public void buildTopology(final StreamsBuilderX builder) { } } + @Test + void shouldAddLineageUsingMaterialized() { + final StringApp app = new StringApp() { + @Override + public void buildTopology(final StreamsBuilderX builder) { + final KTableX input = builder.table("input"); + input.withLineage(MaterializedX.with(Serdes.String(), Serdes.String())) + .toStream() + .to("output"); + } + }; + try (final TestTopology topology = app.startApp()) { + topology.input().add("foo", "bar"); + final List> records = topology.streamOutput().toList(); + this.softly.assertThat(records) + .hasSize(1) + .anySatisfy(rekord -> { + this.softly.assertThat(rekord.key()).isEqualTo("foo"); + this.softly.assertThat(rekord.value()).isEqualTo("bar"); + this.softly.assertThat(rekord.headers().toArray()) + .hasSize(3) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.TOPIC_HEADER); + this.softly.assertThat(new String(header.value(), StandardCharsets.UTF_8)) + .isEqualTo("input"); + }) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.PARTITION_HEADER); + this.softly.assertThat( + Integer.parseInt(new String(header.value(), + StandardCharsets.UTF_8))) + .isEqualTo(0); + }) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.OFFSET_HEADER); + this.softly.assertThat( + Long.parseLong(new String(header.value(), StandardCharsets.UTF_8))) + .isEqualTo(0L); + }); + }); + } + } + + @Test + void shouldAddLineageNamedUsingMaterialized() { + final StringApp app = new StringApp() { + @Override + public void buildTopology(final StreamsBuilderX builder) { + final KTableX input = builder.table("input"); + input.withLineage(MaterializedX.with(Serdes.String(), Serdes.String()), Named.as("lineage")) + .toStream() + .to("output"); + } + }; + try (final TestTopology topology = app.startApp()) { + topology.input().add("foo", "bar"); + final List> records = topology.streamOutput().toList(); + this.softly.assertThat(records) + .hasSize(1) + .anySatisfy(rekord -> { + this.softly.assertThat(rekord.key()).isEqualTo("foo"); + this.softly.assertThat(rekord.value()).isEqualTo("bar"); + this.softly.assertThat(rekord.headers().toArray()) + .hasSize(3) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.TOPIC_HEADER); + this.softly.assertThat(new String(header.value(), StandardCharsets.UTF_8)) + .isEqualTo("input"); + }) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.PARTITION_HEADER); + this.softly.assertThat( + Integer.parseInt(new String(header.value(), + StandardCharsets.UTF_8))) + .isEqualTo(0); + }) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.OFFSET_HEADER); + this.softly.assertThat( + Long.parseLong(new String(header.value(), StandardCharsets.UTF_8))) + .isEqualTo(0L); + }); + }); + } + } + } From acc6c2cd0567cebc5e6ee27c29a83db4cb9a2583 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 15 Sep 2025 15:54:10 +0200 Subject: [PATCH 03/13] Clean up --- .../com/bakdata/kafka/streams/kstream/KTableXImpl.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableXImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableXImpl.java index 790bbf049..eeb0c89af 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableXImpl.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableXImpl.java @@ -623,6 +623,16 @@ public KTableX withLineage(final Named named) { return this.transformValues(LineageTransformer::new, named); } + @Override + public KTableX withLineage(final MaterializedX> materialized) { + return this.transformValues(LineageTransformer::new, materialized); + } + + @Override + public KTableX withLineage(final MaterializedX> materialized, final Named named) { + return this.transformValues(LineageTransformer::new, materialized, named); + } + @Override public String queryableStoreName() { return this.wrapped.queryableStoreName(); From f992b62814caa8e3e35b4af2875a9c466dee1d8e Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 15 Sep 2025 16:13:01 +0200 Subject: [PATCH 04/13] Add docs --- .../kafka/streams/kstream/KStreamX.java | 20 +++++++++ .../kafka/streams/kstream/KTableX.java | 43 +++++++++++++++++++ .../kafka/streams/kstream/LineageHeaders.java | 9 ++++ 3 files changed, 72 insertions(+) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamX.java index 50bf392fc..790d9ce41 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamX.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamX.java @@ -1012,7 +1012,27 @@ KErrorStreamX processValuesCapturingErrors( java.util.function.Predicate errorFilter, Named named, String... stateStoreNames); + /** + * Add lineage information to each record in the stream. This will add the following headers + *
  • + *
      {@link LineageHeaders#TOPIC_HEADER}
    + *
      {@link LineageHeaders#PARTITION_HEADER}
    + *
      {@link LineageHeaders#OFFSET_HEADER}
    + *
  • + * + * @return stream with added headers + */ KStreamX withLineage(); + /** + * Add lineage information to each record in the stream. This will add the following headers + *
  • + *
      {@link LineageHeaders#TOPIC_HEADER}
    + *
      {@link LineageHeaders#PARTITION_HEADER}
    + *
      {@link LineageHeaders#OFFSET_HEADER}
    + *
  • + * @param named a {@link Named} config used to name the processor in the topology + * @return stream with added headers + */ KStreamX withLineage(Named named); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java index 3213b6332..de4dfc656 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java @@ -430,11 +430,54 @@ KTableX leftJoin(KTable other, BiFunction ValueJoiner joiner, TableJoined tableJoined, MaterializedX> materialized); + /** + * Add lineage information to each record in the stream. This will add the following headers + *
  • + *
      {@link LineageHeaders#TOPIC_HEADER}
    + *
      {@link LineageHeaders#PARTITION_HEADER}
    + *
      {@link LineageHeaders#OFFSET_HEADER}
    + *
  • + * + * @return stream with added headers + */ KTableX withLineage(); + /** + * Add lineage information to each record in the stream. This will add the following headers + *
  • + *
      {@link LineageHeaders#TOPIC_HEADER}
    + *
      {@link LineageHeaders#PARTITION_HEADER}
    + *
      {@link LineageHeaders#OFFSET_HEADER}
    + *
  • + * @param named a {@link Named} config used to name the processor in the topology + * @return stream with added headers + */ KTableX withLineage(Named named); + /** + * Add lineage information to each record in the stream. This will add the following headers + *
  • + *
      {@link LineageHeaders#TOPIC_HEADER}
    + *
      {@link LineageHeaders#PARTITION_HEADER}
    + *
      {@link LineageHeaders#OFFSET_HEADER}
    + *
  • + * @param materialized an instance of {@link Materialized} used to describe how the state store of the resulting + * table should be materialized. Cannot be {@code null} + * @return stream with added headers + */ KTableX withLineage(MaterializedX> materialized); + /** + * Add lineage information to each record in the stream. This will add the following headers + *
  • + *
      {@link LineageHeaders#TOPIC_HEADER}
    + *
      {@link LineageHeaders#PARTITION_HEADER}
    + *
      {@link LineageHeaders#OFFSET_HEADER}
    + *
  • + * @param materialized an instance of {@link Materialized} used to describe how the state store of the resulting + * table should be materialized. Cannot be {@code null} + * @param named a {@link Named} config used to name the processor in the topology + * @return stream with added headers + */ KTableX withLineage(MaterializedX> materialized, Named named); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageHeaders.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageHeaders.java index 99303ce50..c04698484 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageHeaders.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageHeaders.java @@ -35,8 +35,17 @@ @RequiredArgsConstructor(access = AccessLevel.PACKAGE) public class LineageHeaders { private static final String LINEAGE_PREFIX = "lineage."; + /** + * Header indicating the topic the record was read from. + */ public static final String TOPIC_HEADER = LINEAGE_PREFIX + "topic"; + /** + * Header indicating the partition the record was read from. + */ public static final String PARTITION_HEADER = LINEAGE_PREFIX + "partition"; + /** + * Header indicating the offset the record was read from. + */ public static final String OFFSET_HEADER = LINEAGE_PREFIX + "offset"; @Getter(AccessLevel.PACKAGE) From 58e75697b26c1049aaf6a914a5edc44d1c964071 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 15 Sep 2025 16:17:17 +0200 Subject: [PATCH 05/13] Add docs --- .../kafka/streams/kstream/KStreamX.java | 20 +++++----- .../kafka/streams/kstream/KTableX.java | 40 +++++++++---------- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamX.java index 790d9ce41..1fb08b6f8 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamX.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamX.java @@ -1014,11 +1014,11 @@ KErrorStreamX processValuesCapturingErrors( /** * Add lineage information to each record in the stream. This will add the following headers - *
  • - *
      {@link LineageHeaders#TOPIC_HEADER}
    - *
      {@link LineageHeaders#PARTITION_HEADER}
    - *
      {@link LineageHeaders#OFFSET_HEADER}
    - *
  • + *
      + *
    • {@link LineageHeaders#TOPIC_HEADER}
    • + *
    • {@link LineageHeaders#PARTITION_HEADER}
    • + *
    • {@link LineageHeaders#OFFSET_HEADER}
    • + *
    * * @return stream with added headers */ @@ -1026,11 +1026,11 @@ KErrorStreamX processValuesCapturingErrors( /** * Add lineage information to each record in the stream. This will add the following headers - *
  • - *
      {@link LineageHeaders#TOPIC_HEADER}
    - *
      {@link LineageHeaders#PARTITION_HEADER}
    - *
      {@link LineageHeaders#OFFSET_HEADER}
    - *
  • + *
      + *
    • {@link LineageHeaders#TOPIC_HEADER}
    • + *
    • {@link LineageHeaders#PARTITION_HEADER}
    • + *
    • {@link LineageHeaders#OFFSET_HEADER}
    • + *
    * @param named a {@link Named} config used to name the processor in the topology * @return stream with added headers */ diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java index de4dfc656..61fa023a5 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java @@ -432,11 +432,11 @@ KTableX leftJoin(KTable other, BiFunction /** * Add lineage information to each record in the stream. This will add the following headers - *
  • - *
      {@link LineageHeaders#TOPIC_HEADER}
    - *
      {@link LineageHeaders#PARTITION_HEADER}
    - *
      {@link LineageHeaders#OFFSET_HEADER}
    - *
  • + *
      + *
    • {@link LineageHeaders#TOPIC_HEADER}
    • + *
    • {@link LineageHeaders#PARTITION_HEADER}
    • + *
    • {@link LineageHeaders#OFFSET_HEADER}
    • + *
    * * @return stream with added headers */ @@ -444,11 +444,11 @@ KTableX leftJoin(KTable other, BiFunction /** * Add lineage information to each record in the stream. This will add the following headers - *
  • - *
      {@link LineageHeaders#TOPIC_HEADER}
    - *
      {@link LineageHeaders#PARTITION_HEADER}
    - *
      {@link LineageHeaders#OFFSET_HEADER}
    - *
  • + *
      + *
    • {@link LineageHeaders#TOPIC_HEADER}
    • + *
    • {@link LineageHeaders#PARTITION_HEADER}
    • + *
    • {@link LineageHeaders#OFFSET_HEADER}
    • + *
    * @param named a {@link Named} config used to name the processor in the topology * @return stream with added headers */ @@ -456,11 +456,11 @@ KTableX leftJoin(KTable other, BiFunction /** * Add lineage information to each record in the stream. This will add the following headers - *
  • - *
      {@link LineageHeaders#TOPIC_HEADER}
    - *
      {@link LineageHeaders#PARTITION_HEADER}
    - *
      {@link LineageHeaders#OFFSET_HEADER}
    - *
  • + *
      + *
    • {@link LineageHeaders#TOPIC_HEADER}
    • + *
    • {@link LineageHeaders#PARTITION_HEADER}
    • + *
    • {@link LineageHeaders#OFFSET_HEADER}
    • + *
    * @param materialized an instance of {@link Materialized} used to describe how the state store of the resulting * table should be materialized. Cannot be {@code null} * @return stream with added headers @@ -469,11 +469,11 @@ KTableX leftJoin(KTable other, BiFunction /** * Add lineage information to each record in the stream. This will add the following headers - *
  • - *
      {@link LineageHeaders#TOPIC_HEADER}
    - *
      {@link LineageHeaders#PARTITION_HEADER}
    - *
      {@link LineageHeaders#OFFSET_HEADER}
    - *
  • + *
      + *
    • {@link LineageHeaders#TOPIC_HEADER}
    • + *
    • {@link LineageHeaders#PARTITION_HEADER}
    • + *
    • {@link LineageHeaders#OFFSET_HEADER}
    • + *
    * @param materialized an instance of {@link Materialized} used to describe how the state store of the resulting * table should be materialized. Cannot be {@code null} * @param named a {@link Named} config used to name the processor in the topology From 2ab7c5588a01ce2f34fe40e4c63d5cc4a46ddc89 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 15 Sep 2025 17:58:10 +0200 Subject: [PATCH 06/13] Add docs --- .../java/com/bakdata/kafka/streams/kstream/KTableXImpl.java | 3 ++- .../java/com/bakdata/kafka/streams/kstream/LineageHeaders.java | 3 +++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableXImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableXImpl.java index eeb0c89af..7929dbf1e 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableXImpl.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableXImpl.java @@ -629,7 +629,8 @@ public KTableX withLineage(final MaterializedX withLineage(final MaterializedX> materialized, final Named named) { + public KTableX withLineage(final MaterializedX> materialized, + final Named named) { return this.transformValues(LineageTransformer::new, materialized, named); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageHeaders.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageHeaders.java index c04698484..b4ccddfda 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageHeaders.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageHeaders.java @@ -32,6 +32,9 @@ import lombok.experimental.Accessors; import org.apache.kafka.common.header.Headers; +/** + * Configure headers for data lineage of Kafka messages + */ @RequiredArgsConstructor(access = AccessLevel.PACKAGE) public class LineageHeaders { private static final String LINEAGE_PREFIX = "lineage."; From 97ace919a9adca07dc405fdde69eb994045594dc Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 15 Sep 2025 20:53:17 +0200 Subject: [PATCH 07/13] Update tests --- .../kafka/streams/kstream/KTableXTest.java | 40 +++++++++++++------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java index aa24ad506..184d4862c 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java @@ -40,10 +40,12 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Suppressed; import org.apache.kafka.streams.kstream.Suppressed.BufferConfig; import org.apache.kafka.streams.kstream.TableJoined; @@ -1927,18 +1929,25 @@ public void buildTopology(final StreamsBuilderX builder) { @Test void shouldAddLineageUsingMaterialized() { - final StringApp app = new StringApp() { + final DoubleApp app = new DoubleApp() { @Override public void buildTopology(final StreamsBuilderX builder) { - final KTableX input = builder.table("input"); + final KTableX input = + builder.table("input", Consumed.with(Serdes.String(), Serdes.String())); input.withLineage(MaterializedX.with(Serdes.String(), Serdes.String())) .toStream() - .to("output"); + .to("output", Produced.with(Serdes.String(), Serdes.String())); } }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - final List> records = topology.streamOutput().toList(); + try (final TestTopology topology = app.startApp()) { + topology.input() + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String()) + .add("foo", "bar"); + final List> records = topology.streamOutput() + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String()) + .toList(); this.softly.assertThat(records) .hasSize(1) .anySatisfy(rekord -> { @@ -1970,18 +1979,25 @@ public void buildTopology(final StreamsBuilderX builder) { @Test void shouldAddLineageNamedUsingMaterialized() { - final StringApp app = new StringApp() { + final DoubleApp app = new DoubleApp() { @Override public void buildTopology(final StreamsBuilderX builder) { - final KTableX input = builder.table("input"); + final KTableX input = + builder.table("input", Consumed.with(Serdes.String(), Serdes.String())); input.withLineage(MaterializedX.with(Serdes.String(), Serdes.String()), Named.as("lineage")) .toStream() - .to("output"); + .to("output", Produced.with(Serdes.String(), Serdes.String())); } }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - final List> records = topology.streamOutput().toList(); + try (final TestTopology topology = app.startApp()) { + topology.input() + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String()) + .add("foo", "bar"); + final List> records = topology.streamOutput() + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String()) + .toList(); this.softly.assertThat(records) .hasSize(1) .anySatisfy(rekord -> { From b6763a5b41fda0cb3011e3c0f997bc8c85a5e05a Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 27 Oct 2025 13:11:00 +0100 Subject: [PATCH 08/13] Configure via property --- .../bakdata/kafka/streams/StreamsConfigX.java | 44 ++++- .../kafka/streams/kstream/KStreamX.java | 24 --- .../kafka/streams/kstream/KStreamXImpl.java | 10 - .../kafka/streams/kstream/KTableX.java | 51 ----- .../kafka/streams/kstream/KTableXImpl.java | 21 -- .../streams/kstream/StreamsBuilderX.java | 88 ++++++--- .../kafka/streams/kstream/KStreamXTest.java | 84 -------- .../kafka/streams/kstream/KTableXTest.java | 187 ------------------ .../streams/kstream/StreamsBuilderXTest.java | 151 ++++++++++++++ 9 files changed, 251 insertions(+), 409 deletions(-) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsConfigX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsConfigX.java index ba75b714b..645305fa0 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsConfigX.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsConfigX.java @@ -30,8 +30,14 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import lombok.EqualsAndHashCode; import lombok.NonNull; +import lombok.ToString; import lombok.Value; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.state.HostInfo; @@ -39,36 +45,59 @@ * Class for simplified access to configs provided by {@link StreamsConfig} */ @Value -public class StreamsConfigX { +@EqualsAndHashCode(callSuper = true) +@ToString(callSuper = true) +public class StreamsConfigX extends AbstractConfig { + public static final String LINEAGE_ENABLED_CONFIG = "streams.bootstrap.lineage.enabled"; + private static final String LINEAGE_ENABLED_DOC = ""; + private static final ConfigDef CONFIG_DEF = StreamsConfig.configDef() + .define(LINEAGE_ENABLED_CONFIG, Type.BOOLEAN, false, Importance.LOW, LINEAGE_ENABLED_DOC); @NonNull StreamsConfig streamsConfig; + public StreamsConfigX(final StreamsConfig streamsConfig) { + super(CONFIG_DEF, streamsConfig.originals()); + this.streamsConfig = streamsConfig; + } + + private static HostInfo createHostInfo(final String applicationServerConfig) { + final String[] hostAndPort = applicationServerConfig.split(":"); + return new HostInfo(hostAndPort[0], Integer.parseInt(hostAndPort[1])); + } + /** * Get the application id of the underlying {@link StreamsConfig} + * * @return application id * @see StreamsConfig#APPLICATION_ID_CONFIG */ public String getAppId() { - return this.streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG); + return this.getString(StreamsConfig.APPLICATION_ID_CONFIG); } /** * Get the bootstrap servers of the underlying {@link StreamsConfig} + * * @return list of bootstrap servers * @see StreamsConfig#BOOTSTRAP_SERVERS_CONFIG */ public List getBoostrapServers() { - return this.streamsConfig.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); + return this.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); + } + + public boolean isLineageEnabled() { + return this.getBoolean(LINEAGE_ENABLED_CONFIG); } /** * Get all configs of the underlying {@link StreamsConfig} + * * @return Kafka configs * @see StreamsConfig#originals() */ public Map getKafkaProperties() { - return Collections.unmodifiableMap(this.streamsConfig.originals()); + return Collections.unmodifiableMap(this.originals()); } /** @@ -78,13 +107,8 @@ public Map getKafkaProperties() { * {@link StreamsConfig#APPLICATION_SERVER_CONFIG} is set; otherwise, an empty {@link Optional}. */ public Optional getApplicationServer() { - final String applicationServerConfig = this.streamsConfig.getString(APPLICATION_SERVER_CONFIG); + final String applicationServerConfig = this.getString(APPLICATION_SERVER_CONFIG); return applicationServerConfig.isEmpty() ? Optional.empty() : Optional.of(createHostInfo(applicationServerConfig)); } - - private static HostInfo createHostInfo(final String applicationServerConfig) { - final String[] hostAndPort = applicationServerConfig.split(":"); - return new HostInfo(hostAndPort[0], Integer.parseInt(hostAndPort[1])); - } } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamX.java index 1fb08b6f8..55f3b0162 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamX.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamX.java @@ -1011,28 +1011,4 @@ KErrorStreamX processValuesCapturingErrors( FixedKeyProcessorSupplier processorSupplier, java.util.function.Predicate errorFilter, Named named, String... stateStoreNames); - - /** - * Add lineage information to each record in the stream. This will add the following headers - *
      - *
    • {@link LineageHeaders#TOPIC_HEADER}
    • - *
    • {@link LineageHeaders#PARTITION_HEADER}
    • - *
    • {@link LineageHeaders#OFFSET_HEADER}
    • - *
    - * - * @return stream with added headers - */ - KStreamX withLineage(); - - /** - * Add lineage information to each record in the stream. This will add the following headers - *
      - *
    • {@link LineageHeaders#TOPIC_HEADER}
    • - *
    • {@link LineageHeaders#PARTITION_HEADER}
    • - *
    • {@link LineageHeaders#OFFSET_HEADER}
    • - *
    - * @param named a {@link Named} config used to name the processor in the topology - * @return stream with added headers - */ - KStreamX withLineage(Named named); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamXImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamXImpl.java index 7a412f30b..0bdd0b6ac 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamXImpl.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KStreamXImpl.java @@ -936,16 +936,6 @@ public KErrorStreamX processValuesCapturingErrors( ErrorCapturingValueProcessor.captureErrors(processorSupplier, errorFilter), named, stateStoreNames); } - @Override - public KStreamX withLineage() { - return this.processValues(LineageProcessor::new); - } - - @Override - public KStreamX withLineage(final Named named) { - return this.processValues(LineageProcessor::new, named); - } - private KeyValueKErrorStreamX mapCapturingErrorsInternal( final KeyValueMapper>> mapper) { final KStreamX> map = this.map(mapper); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java index 61fa023a5..3a77d5c85 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableX.java @@ -429,55 +429,4 @@ KTableX leftJoin(KTable other, BiFunction KTableX leftJoin(KTable other, BiFunction foreignKeyExtractor, ValueJoiner joiner, TableJoined tableJoined, MaterializedX> materialized); - - /** - * Add lineage information to each record in the stream. This will add the following headers - *
      - *
    • {@link LineageHeaders#TOPIC_HEADER}
    • - *
    • {@link LineageHeaders#PARTITION_HEADER}
    • - *
    • {@link LineageHeaders#OFFSET_HEADER}
    • - *
    - * - * @return stream with added headers - */ - KTableX withLineage(); - - /** - * Add lineage information to each record in the stream. This will add the following headers - *
      - *
    • {@link LineageHeaders#TOPIC_HEADER}
    • - *
    • {@link LineageHeaders#PARTITION_HEADER}
    • - *
    • {@link LineageHeaders#OFFSET_HEADER}
    • - *
    - * @param named a {@link Named} config used to name the processor in the topology - * @return stream with added headers - */ - KTableX withLineage(Named named); - - /** - * Add lineage information to each record in the stream. This will add the following headers - *
      - *
    • {@link LineageHeaders#TOPIC_HEADER}
    • - *
    • {@link LineageHeaders#PARTITION_HEADER}
    • - *
    • {@link LineageHeaders#OFFSET_HEADER}
    • - *
    - * @param materialized an instance of {@link Materialized} used to describe how the state store of the resulting - * table should be materialized. Cannot be {@code null} - * @return stream with added headers - */ - KTableX withLineage(MaterializedX> materialized); - - /** - * Add lineage information to each record in the stream. This will add the following headers - *
      - *
    • {@link LineageHeaders#TOPIC_HEADER}
    • - *
    • {@link LineageHeaders#PARTITION_HEADER}
    • - *
    • {@link LineageHeaders#OFFSET_HEADER}
    • - *
    - * @param materialized an instance of {@link Materialized} used to describe how the state store of the resulting - * table should be materialized. Cannot be {@code null} - * @param named a {@link Named} config used to name the processor in the topology - * @return stream with added headers - */ - KTableX withLineage(MaterializedX> materialized, Named named); } diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableXImpl.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableXImpl.java index 7929dbf1e..34b7c250b 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableXImpl.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/KTableXImpl.java @@ -613,27 +613,6 @@ public KTableX leftJoin(final KTable other, materialized.configure(this.context.getConfigurator())); } - @Override - public KTableX withLineage() { - return this.transformValues(LineageTransformer::new); - } - - @Override - public KTableX withLineage(final Named named) { - return this.transformValues(LineageTransformer::new, named); - } - - @Override - public KTableX withLineage(final MaterializedX> materialized) { - return this.transformValues(LineageTransformer::new, materialized); - } - - @Override - public KTableX withLineage(final MaterializedX> materialized, - final Named named) { - return this.transformValues(LineageTransformer::new, materialized, named); - } - @Override public String queryableStoreName() { return this.wrapped.queryableStoreName(); diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/StreamsBuilderX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/StreamsBuilderX.java index aa36942fc..7d1bd13d6 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/StreamsBuilderX.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/StreamsBuilderX.java @@ -26,6 +26,7 @@ import com.bakdata.kafka.AppConfiguration; import com.bakdata.kafka.Configurator; +import com.bakdata.kafka.streams.StreamsConfigX; import com.bakdata.kafka.streams.StreamsTopicConfig; import java.util.Collection; import java.util.Map; @@ -35,6 +36,7 @@ import lombok.Value; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.GlobalKTable; @@ -74,14 +76,16 @@ public StreamsBuilderX(final StreamsTopicConfig topics, final Map KStreamX stream(final String topic) { - return this.getContext().wrap(this.streamsBuilder.stream(topic)); + final KStreamX wrap = this.getContext().wrap(this.streamsBuilder.stream(topic)); + return createStream(wrap); } /** * @see StreamsBuilder#stream(String, Consumed) */ public KStreamX stream(final String topic, final Consumed consumed) { - return this.getContext().wrap(this.streamsBuilder.stream(topic, consumed)); + final KStreamX wrap = this.getContext().wrap(this.streamsBuilder.stream(topic, consumed)); + return createStream(wrap); } /** @@ -95,14 +99,16 @@ public KStreamX stream(final String topic, final ConsumedX co * @see StreamsBuilder#stream(Collection) */ public KStreamX stream(final Collection topics) { - return this.getContext().wrap(this.streamsBuilder.stream(topics)); + final KStreamX wrap = this.getContext().wrap(this.streamsBuilder.stream(topics)); + return createStream(wrap); } /** * @see StreamsBuilder#stream(Collection, Consumed) */ public KStreamX stream(final Collection topics, final Consumed consumed) { - return this.getContext().wrap(this.streamsBuilder.stream(topics, consumed)); + final KStreamX wrap = this.getContext().wrap(this.streamsBuilder.stream(topics, consumed)); + return createStream(wrap); } /** @@ -117,14 +123,16 @@ public KStreamX stream(final Collection topics, * @see StreamsBuilder#stream(Pattern) */ public KStreamX stream(final Pattern topicPattern) { - return this.getContext().wrap(this.streamsBuilder.stream(topicPattern)); + final KStreamX wrap = this.getContext().wrap(this.streamsBuilder.stream(topicPattern)); + return createStream(wrap); } /** * @see StreamsBuilder#stream(Pattern, Consumed) */ public KStreamX stream(final Pattern topicPattern, final Consumed consumed) { - return this.getContext().wrap(this.streamsBuilder.stream(topicPattern, consumed)); + final KStreamX wrap = this.getContext().wrap(this.streamsBuilder.stream(topicPattern, consumed)); + return createStream(wrap); } /** @@ -136,10 +144,11 @@ public KStreamX stream(final Pattern topicPattern, final ConsumedX< /** * Create a {@link KStreamX} from all {@link StreamsTopicConfig#getInputTopics()} + * * @param consumed define optional parameters for streaming topics - * @return a {@link KStreamX} for all {@link StreamsTopicConfig#getInputTopics()} * @param type of keys * @param type of values + * @return a {@link KStreamX} for all {@link StreamsTopicConfig#getInputTopics()} * @see StreamsBuilder#stream(Collection, Consumed) */ public KStreamX streamInput(final Consumed consumed) { @@ -148,10 +157,11 @@ public KStreamX streamInput(final Consumed consumed) { /** * Create a {@link KStreamX} from all {@link StreamsTopicConfig#getInputTopics()} + * * @param consumed define optional parameters for streaming topics - * @return a {@link KStreamX} for all {@link StreamsTopicConfig#getInputTopics()} * @param type of keys * @param type of values + * @return a {@link KStreamX} for all {@link StreamsTopicConfig#getInputTopics()} * @see StreamsBuilder#stream(Collection, Consumed) */ public KStreamX streamInput(final ConsumedX consumed) { @@ -160,9 +170,10 @@ public KStreamX streamInput(final ConsumedX consumed) { /** * Create a {@link KStreamX} from all {@link StreamsTopicConfig#getInputTopics()} - * @return a {@link KStreamX} for all {@link StreamsTopicConfig#getInputTopics()} + * * @param type of keys * @param type of values + * @return a {@link KStreamX} for all {@link StreamsTopicConfig#getInputTopics()} * @see StreamsBuilder#stream(Collection) */ public KStreamX streamInput() { @@ -171,11 +182,12 @@ public KStreamX streamInput() { /** * Create a {@link KStreamX} from all {@link StreamsTopicConfig#getInputTopics(String)} + * * @param label label of input topics * @param consumed define optional parameters for streaming topics - * @return a {@link KStreamX} for all {@link StreamsTopicConfig#getInputTopics(String)} * @param type of keys * @param type of values + * @return a {@link KStreamX} for all {@link StreamsTopicConfig#getInputTopics(String)} * @see StreamsBuilder#stream(Collection, Consumed) */ public KStreamX streamInput(final String label, final Consumed consumed) { @@ -184,11 +196,12 @@ public KStreamX streamInput(final String label, final Consumed type of keys * @param type of values + * @return a {@link KStreamX} for all {@link StreamsTopicConfig#getInputTopics(String)} * @see StreamsBuilder#stream(Collection, Consumed) */ public KStreamX streamInput(final String label, final ConsumedX consumed) { @@ -197,10 +210,11 @@ public KStreamX streamInput(final String label, final ConsumedX type of keys * @param type of values + * @return a {@link KStreamX} for all {@link StreamsTopicConfig#getInputTopics(String)} * @see StreamsBuilder#stream(Collection) */ public KStreamX streamInput(final String label) { @@ -209,10 +223,11 @@ public KStreamX streamInput(final String label) { /** * Create a {@link KStreamX} from all topics matching {@link StreamsTopicConfig#getInputPattern()} + * * @param consumed define optional parameters for streaming topics - * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern()} * @param type of keys * @param type of values + * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern()} * @see StreamsBuilder#stream(Pattern, Consumed) */ public KStreamX streamInputPattern(final Consumed consumed) { @@ -221,10 +236,11 @@ public KStreamX streamInputPattern(final Consumed consumed) { /** * Create a {@link KStreamX} from all topics matching {@link StreamsTopicConfig#getInputPattern()} + * * @param consumed define optional parameters for streaming topics - * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern()} * @param type of keys * @param type of values + * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern()} * @see StreamsBuilder#stream(Pattern, Consumed) */ public KStreamX streamInputPattern(final ConsumedX consumed) { @@ -233,9 +249,10 @@ public KStreamX streamInputPattern(final ConsumedX consumed) /** * Create a {@link KStreamX} from all topics matching {@link StreamsTopicConfig#getInputPattern()} - * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern()} + * * @param type of keys * @param type of values + * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern()} * @see StreamsBuilder#stream(Pattern) */ public KStreamX streamInputPattern() { @@ -244,11 +261,12 @@ public KStreamX streamInputPattern() { /** * Create a {@link KStreamX} from all topics matching {@link StreamsTopicConfig#getInputPattern(String)} + * * @param label label of input pattern * @param consumed define optional parameters for streaming topics - * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern(String)} * @param type of keys * @param type of values + * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern(String)} * @see StreamsBuilder#stream(Pattern, Consumed) */ public KStreamX streamInputPattern(final String label, final Consumed consumed) { @@ -257,11 +275,12 @@ public KStreamX streamInputPattern(final String label, final Consum /** * Create a {@link KStreamX} from all topics matching {@link StreamsTopicConfig#getInputPattern(String)} + * * @param label label of input pattern * @param consumed define optional parameters for streaming topics - * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern(String)} * @param type of keys * @param type of values + * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern(String)} * @see StreamsBuilder#stream(Pattern, Consumed) */ public KStreamX streamInputPattern(final String label, @@ -271,10 +290,11 @@ public KStreamX streamInputPattern(final String label, /** * Create a {@link KStreamX} from all topics matching {@link StreamsTopicConfig#getInputPattern(String)} + * * @param label label of input pattern - * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern(String)} * @param type of keys * @param type of values + * @return a {@link KStreamX} for all topics matching {@link StreamsTopicConfig#getInputPattern(String)} * @see StreamsBuilder#stream(Pattern) */ public KStreamX streamInputPattern(final String label) { @@ -285,14 +305,16 @@ public KStreamX streamInputPattern(final String label) { * @see StreamsBuilder#table(String) */ public KTableX table(final String topic) { - return this.getContext().wrap(this.streamsBuilder.table(topic)); + final KTableX wrap = this.getContext().wrap(this.streamsBuilder.table(topic)); + return createTable(wrap); } /** * @see StreamsBuilder#table(String, Consumed) */ public KTableX table(final String topic, final Consumed consumed) { - return this.getContext().wrap(this.streamsBuilder.table(topic, consumed)); + final KTableX wrap = this.getContext().wrap(this.streamsBuilder.table(topic, consumed)); + return createTable(wrap); } /** @@ -307,7 +329,8 @@ public KTableX table(final String topic, final ConsumedX cons */ public KTableX table(final String topic, final Materialized> materialized) { - return this.getContext().wrap(this.streamsBuilder.table(topic, materialized)); + final KTableX wrap = this.getContext().wrap(this.streamsBuilder.table(topic, materialized)); + return createTable(wrap); } /** @@ -323,7 +346,8 @@ public KTableX table(final String topic, */ public KTableX table(final String topic, final Consumed consumed, final Materialized> materialized) { - return this.getContext().wrap(this.streamsBuilder.table(topic, consumed, materialized)); + final KTableX wrap = this.getContext().wrap(this.streamsBuilder.table(topic, consumed, materialized)); + return createTable(wrap); } /** @@ -418,6 +442,7 @@ public StreamsBuilderX addGlobalStore(final StoreBuilder storeBuil /** * Create {@link Configurator} to configure {@link org.apache.kafka.common.serialization.Serde} and * {@link org.apache.kafka.common.serialization.Serializer} using {@link #kafkaProperties}. + * * @return {@link Configurator} */ public Configurator createConfigurator() { @@ -426,6 +451,7 @@ public Configurator createConfigurator() { /** * Create {@link AppConfiguration} used by this app + * * @return {@link AppConfiguration} */ public AppConfiguration createConfiguration() { @@ -434,6 +460,7 @@ public AppConfiguration createConfiguration() { /** * Create a {@link StreamsContext} to wrap Kafka Streams interfaces + * * @return {@link StreamsContext} */ public StreamsContext getContext() { @@ -442,6 +469,7 @@ public StreamsContext getContext() { /** * Create stores using application context to lazily configures Serdes + * * @return {@link StoresX} */ public StoresX stores() { @@ -457,4 +485,20 @@ public StoresX stores() { public Topology build() { return this.streamsBuilder.build(); } + + private KStreamX createStream(final KStreamX wrap) { + final StreamsConfigX streamsConfigX = new StreamsConfigX(new StreamsConfig(this.kafkaProperties)); + if (streamsConfigX.isLineageEnabled()) { + return wrap.processValues(LineageProcessor::new); + } + return wrap; + } + + private KTableX createTable(final KTableX wrap) { + final StreamsConfigX streamsConfigX = new StreamsConfigX(new StreamsConfig(this.kafkaProperties)); + if (streamsConfigX.isLineageEnabled()) { + return wrap.transformValues(LineageTransformer::new); + } + return wrap; + } } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KStreamXTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KStreamXTest.java index a18a57cdb..c3070a976 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KStreamXTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KStreamXTest.java @@ -39,11 +39,9 @@ import com.bakdata.kafka.streams.apps.DoubleApp; import com.bakdata.kafka.streams.apps.StringApp; import java.io.File; -import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.List; import java.util.Map; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.StreamsException; @@ -3639,86 +3637,4 @@ public void buildTopology(final StreamsBuilderX builder) { } } - @Test - void shouldAddLineage() { - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final KStreamX input = builder.stream("input"); - input.withLineage().to("output"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - final List> records = topology.streamOutput().toList(); - this.softly.assertThat(records) - .hasSize(1) - .anySatisfy(rekord -> { - this.softly.assertThat(rekord.key()).isEqualTo("foo"); - this.softly.assertThat(rekord.value()).isEqualTo("bar"); - this.softly.assertThat(rekord.headers().toArray()) - .hasSize(3) - .anySatisfy(header -> { - this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.TOPIC_HEADER); - this.softly.assertThat(new String(header.value(), StandardCharsets.UTF_8)) - .isEqualTo("input"); - }) - .anySatisfy(header -> { - this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.PARTITION_HEADER); - this.softly.assertThat( - Integer.parseInt(new String(header.value(), - StandardCharsets.UTF_8))) - .isEqualTo(0); - }) - .anySatisfy(header -> { - this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.OFFSET_HEADER); - this.softly.assertThat( - Long.parseLong(new String(header.value(), StandardCharsets.UTF_8))) - .isEqualTo(0L); - }); - }); - } - } - - @Test - void shouldAddLineageNamed() { - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final KStreamX input = builder.stream("input"); - input.withLineage(Named.as("lineage")).to("output"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - final List> records = topology.streamOutput().toList(); - this.softly.assertThat(records) - .hasSize(1) - .anySatisfy(rekord -> { - this.softly.assertThat(rekord.key()).isEqualTo("foo"); - this.softly.assertThat(rekord.value()).isEqualTo("bar"); - this.softly.assertThat(rekord.headers().toArray()) - .hasSize(3) - .anySatisfy(header -> { - this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.TOPIC_HEADER); - this.softly.assertThat(new String(header.value(), StandardCharsets.UTF_8)) - .isEqualTo("input"); - }) - .anySatisfy(header -> { - this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.PARTITION_HEADER); - this.softly.assertThat( - Integer.parseInt(new String(header.value(), - StandardCharsets.UTF_8))) - .isEqualTo(0); - }) - .anySatisfy(header -> { - this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.OFFSET_HEADER); - this.softly.assertThat( - Long.parseLong(new String(header.value(), StandardCharsets.UTF_8))) - .isEqualTo(0L); - }); - }); - } - } - } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java index 184d4862c..1cc3aa29c 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java @@ -33,19 +33,14 @@ import com.bakdata.kafka.streams.StreamsTopicConfig; import com.bakdata.kafka.streams.apps.DoubleApp; import com.bakdata.kafka.streams.apps.StringApp; -import java.nio.charset.StandardCharsets; import java.time.Duration; -import java.util.List; import java.util.function.Function; -import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Suppressed; import org.apache.kafka.streams.kstream.Suppressed.BufferConfig; import org.apache.kafka.streams.kstream.TableJoined; @@ -1845,186 +1840,4 @@ void shouldHaveQueryableStoreName() { this.softly.assertThat(table.queryableStoreName()).isEqualTo("store"); } - @Test - void shouldAddLineage() { - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final KTableX input = builder.table("input"); - input.withLineage().toStream().to("output"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - final List> records = topology.streamOutput().toList(); - this.softly.assertThat(records) - .hasSize(1) - .anySatisfy(rekord -> { - this.softly.assertThat(rekord.key()).isEqualTo("foo"); - this.softly.assertThat(rekord.value()).isEqualTo("bar"); - this.softly.assertThat(rekord.headers().toArray()) - .hasSize(3) - .anySatisfy(header -> { - this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.TOPIC_HEADER); - this.softly.assertThat(new String(header.value(), StandardCharsets.UTF_8)) - .isEqualTo("input"); - }) - .anySatisfy(header -> { - this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.PARTITION_HEADER); - this.softly.assertThat( - Integer.parseInt(new String(header.value(), - StandardCharsets.UTF_8))) - .isEqualTo(0); - }) - .anySatisfy(header -> { - this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.OFFSET_HEADER); - this.softly.assertThat( - Long.parseLong(new String(header.value(), StandardCharsets.UTF_8))) - .isEqualTo(0L); - }); - }); - } - } - - @Test - void shouldAddLineageNamed() { - final StringApp app = new StringApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final KTableX input = builder.table("input"); - input.withLineage(Named.as("lineage")).toStream().to("output"); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input().add("foo", "bar"); - final List> records = topology.streamOutput().toList(); - this.softly.assertThat(records) - .hasSize(1) - .anySatisfy(rekord -> { - this.softly.assertThat(rekord.key()).isEqualTo("foo"); - this.softly.assertThat(rekord.value()).isEqualTo("bar"); - this.softly.assertThat(rekord.headers().toArray()) - .hasSize(3) - .anySatisfy(header -> { - this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.TOPIC_HEADER); - this.softly.assertThat(new String(header.value(), StandardCharsets.UTF_8)) - .isEqualTo("input"); - }) - .anySatisfy(header -> { - this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.PARTITION_HEADER); - this.softly.assertThat( - Integer.parseInt(new String(header.value(), - StandardCharsets.UTF_8))) - .isEqualTo(0); - }) - .anySatisfy(header -> { - this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.OFFSET_HEADER); - this.softly.assertThat( - Long.parseLong(new String(header.value(), StandardCharsets.UTF_8))) - .isEqualTo(0L); - }); - }); - } - } - - @Test - void shouldAddLineageUsingMaterialized() { - final DoubleApp app = new DoubleApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final KTableX input = - builder.table("input", Consumed.with(Serdes.String(), Serdes.String())); - input.withLineage(MaterializedX.with(Serdes.String(), Serdes.String())) - .toStream() - .to("output", Produced.with(Serdes.String(), Serdes.String())); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input() - .withKeySerde(Serdes.String()) - .withValueSerde(Serdes.String()) - .add("foo", "bar"); - final List> records = topology.streamOutput() - .withKeySerde(Serdes.String()) - .withValueSerde(Serdes.String()) - .toList(); - this.softly.assertThat(records) - .hasSize(1) - .anySatisfy(rekord -> { - this.softly.assertThat(rekord.key()).isEqualTo("foo"); - this.softly.assertThat(rekord.value()).isEqualTo("bar"); - this.softly.assertThat(rekord.headers().toArray()) - .hasSize(3) - .anySatisfy(header -> { - this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.TOPIC_HEADER); - this.softly.assertThat(new String(header.value(), StandardCharsets.UTF_8)) - .isEqualTo("input"); - }) - .anySatisfy(header -> { - this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.PARTITION_HEADER); - this.softly.assertThat( - Integer.parseInt(new String(header.value(), - StandardCharsets.UTF_8))) - .isEqualTo(0); - }) - .anySatisfy(header -> { - this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.OFFSET_HEADER); - this.softly.assertThat( - Long.parseLong(new String(header.value(), StandardCharsets.UTF_8))) - .isEqualTo(0L); - }); - }); - } - } - - @Test - void shouldAddLineageNamedUsingMaterialized() { - final DoubleApp app = new DoubleApp() { - @Override - public void buildTopology(final StreamsBuilderX builder) { - final KTableX input = - builder.table("input", Consumed.with(Serdes.String(), Serdes.String())); - input.withLineage(MaterializedX.with(Serdes.String(), Serdes.String()), Named.as("lineage")) - .toStream() - .to("output", Produced.with(Serdes.String(), Serdes.String())); - } - }; - try (final TestTopology topology = app.startApp()) { - topology.input() - .withKeySerde(Serdes.String()) - .withValueSerde(Serdes.String()) - .add("foo", "bar"); - final List> records = topology.streamOutput() - .withKeySerde(Serdes.String()) - .withValueSerde(Serdes.String()) - .toList(); - this.softly.assertThat(records) - .hasSize(1) - .anySatisfy(rekord -> { - this.softly.assertThat(rekord.key()).isEqualTo("foo"); - this.softly.assertThat(rekord.value()).isEqualTo("bar"); - this.softly.assertThat(rekord.headers().toArray()) - .hasSize(3) - .anySatisfy(header -> { - this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.TOPIC_HEADER); - this.softly.assertThat(new String(header.value(), StandardCharsets.UTF_8)) - .isEqualTo("input"); - }) - .anySatisfy(header -> { - this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.PARTITION_HEADER); - this.softly.assertThat( - Integer.parseInt(new String(header.value(), - StandardCharsets.UTF_8))) - .isEqualTo(0); - }) - .anySatisfy(header -> { - this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.OFFSET_HEADER); - this.softly.assertThat( - Long.parseLong(new String(header.value(), StandardCharsets.UTF_8))) - .isEqualTo(0L); - }); - }); - } - } - } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/StreamsBuilderXTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/StreamsBuilderXTest.java index b4baed195..1034217d8 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/StreamsBuilderXTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/StreamsBuilderXTest.java @@ -26,12 +26,15 @@ import com.bakdata.fluent_kafka_streams_tests.TestTopology; import com.bakdata.kafka.Preconfigured; +import com.bakdata.kafka.streams.StreamsConfigX; import com.bakdata.kafka.streams.StreamsTopicConfig; import com.bakdata.kafka.streams.apps.DoubleApp; import com.bakdata.kafka.streams.apps.StringApp; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; import java.util.regex.Pattern; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.GlobalKTable; @@ -41,10 +44,18 @@ import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +@ExtendWith(SoftAssertionsExtension.class) class StreamsBuilderXTest { + @InjectSoftAssertions + private SoftAssertions softly; + @Test void shouldReadFromInput() { final StringApp app = new StringApp() { @@ -631,4 +642,144 @@ public void buildTopology(final StreamsBuilderX builder) { } } + @Test + void shouldAddLineageToStream() { + final StringApp app = new StringApp() { + @Override + public void buildTopology(final StreamsBuilderX builder) { + final KStreamX input = builder.stream("input"); + input.to("output"); + } + + @Override + public Map createKafkaProperties() { + return Map.of( + StreamsConfigX.LINEAGE_ENABLED_CONFIG, true + ); + } + }; + try (final TestTopology topology = app.startApp()) { + topology.input().add("foo", "bar"); + final List> records = topology.streamOutput().toList(); + this.softly.assertThat(records) + .hasSize(1) + .anySatisfy(rekord -> { + this.softly.assertThat(rekord.key()).isEqualTo("foo"); + this.softly.assertThat(rekord.value()).isEqualTo("bar"); + this.softly.assertThat(rekord.headers().toArray()) + .hasSize(3) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.TOPIC_HEADER); + this.softly.assertThat(new String(header.value(), StandardCharsets.UTF_8)) + .isEqualTo("input"); + }) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.PARTITION_HEADER); + this.softly.assertThat( + Integer.parseInt(new String(header.value(), + StandardCharsets.UTF_8))) + .isEqualTo(0); + }) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.OFFSET_HEADER); + this.softly.assertThat( + Long.parseLong(new String(header.value(), StandardCharsets.UTF_8))) + .isEqualTo(0L); + }); + }); + } + } + + @Test + void shouldNotAddLineageToStream() { + final StringApp app = new StringApp() { + @Override + public void buildTopology(final StreamsBuilderX builder) { + final KStreamX input = builder.stream("input"); + input.to("output"); + } + }; + try (final TestTopology topology = app.startApp()) { + topology.input().add("foo", "bar"); + final List> records = topology.streamOutput().toList(); + this.softly.assertThat(records) + .hasSize(1) + .anySatisfy(rekord -> { + this.softly.assertThat(rekord.key()).isEqualTo("foo"); + this.softly.assertThat(rekord.value()).isEqualTo("bar"); + this.softly.assertThat(rekord.headers().toArray()).isEmpty(); + }); + } + } + + @Test + void shouldAddLineageToTable() { + final StringApp app = new StringApp() { + @Override + public void buildTopology(final StreamsBuilderX builder) { + final KTableX input = builder.table("input"); + input.toStream().to("output"); + } + + @Override + public Map createKafkaProperties() { + return Map.of( + StreamsConfigX.LINEAGE_ENABLED_CONFIG, true + ); + } + }; + try (final TestTopology topology = app.startApp()) { + topology.input().add("foo", "bar"); + final List> records = topology.streamOutput().toList(); + this.softly.assertThat(records) + .hasSize(1) + .anySatisfy(rekord -> { + this.softly.assertThat(rekord.key()).isEqualTo("foo"); + this.softly.assertThat(rekord.value()).isEqualTo("bar"); + this.softly.assertThat(rekord.headers().toArray()) + .hasSize(3) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.TOPIC_HEADER); + this.softly.assertThat(new String(header.value(), StandardCharsets.UTF_8)) + .isEqualTo("input"); + }) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.PARTITION_HEADER); + this.softly.assertThat( + Integer.parseInt(new String(header.value(), + StandardCharsets.UTF_8))) + .isEqualTo(0); + }) + .anySatisfy(header -> { + this.softly.assertThat(header.key()).isEqualTo(LineageHeaders.OFFSET_HEADER); + this.softly.assertThat( + Long.parseLong(new String(header.value(), StandardCharsets.UTF_8))) + .isEqualTo(0L); + }); + }); + } + } + + @Test + void shouldNotAddLineageToTable() { + final StringApp app = new StringApp() { + @Override + public void buildTopology(final StreamsBuilderX builder) { + final KTableX input = builder.table("input"); + input.toStream().to("output"); + } + }; + try (final TestTopology topology = app.startApp()) { + topology.input().add("foo", "bar"); + final List> records = topology.streamOutput().toList(); + this.softly.assertThat(records) + .hasSize(1) + .anySatisfy(rekord -> { + this.softly.assertThat(rekord.key()).isEqualTo("foo"); + this.softly.assertThat(rekord.value()).isEqualTo("bar"); + this.softly.assertThat(rekord.headers().toArray()).isEmpty(); + }); + } + } + } From a187dddfe3c251e3c15de978631577a7b6704127 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 27 Oct 2025 13:12:50 +0100 Subject: [PATCH 09/13] Configure via property --- .../java/com/bakdata/kafka/streams/kstream/LineageHeaders.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageHeaders.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageHeaders.java index b4ccddfda..79270f9a7 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageHeaders.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageHeaders.java @@ -37,7 +37,7 @@ */ @RequiredArgsConstructor(access = AccessLevel.PACKAGE) public class LineageHeaders { - private static final String LINEAGE_PREFIX = "lineage."; + private static final String LINEAGE_PREFIX = "streams.bootstrap.lineage."; /** * Header indicating the topic the record was read from. */ From 8fd0feb9975476dce0375d39ebaf60a2462a6364 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 27 Oct 2025 13:33:34 +0100 Subject: [PATCH 10/13] Configure via property --- .../com/bakdata/kafka/streams/kstream/KTableXTest.java | 7 +++++-- .../bakdata/kafka/streams/kstream/MaterializedXTest.java | 6 ++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java index 1cc3aa29c..ed757cc5b 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/KTableXTest.java @@ -24,7 +24,6 @@ package com.bakdata.kafka.streams.kstream; -import static java.util.Collections.emptyMap; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -34,6 +33,7 @@ import com.bakdata.kafka.streams.apps.DoubleApp; import com.bakdata.kafka.streams.apps.StringApp; import java.time.Duration; +import java.util.Map; import java.util.function.Function; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; @@ -1835,7 +1835,10 @@ public void buildTopology(final StreamsBuilderX builder) { @Test void shouldHaveQueryableStoreName() { - final StreamsBuilderX builder = new StreamsBuilderX(StreamsTopicConfig.builder().build(), emptyMap()); + final StreamsBuilderX builder = new StreamsBuilderX(StreamsTopicConfig.builder().build(), Map.of( + "application.id", "app-id", + "bootstrap.servers", "localhost:9092" + )); final KTableX table = builder.stream("input").toTable(Materialized.as("store")); this.softly.assertThat(table.queryableStoreName()).isEqualTo("store"); } diff --git a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/MaterializedXTest.java b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/MaterializedXTest.java index ec9b1d124..590682666 100644 --- a/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/MaterializedXTest.java +++ b/streams-bootstrap-core/src/test/java/com/bakdata/kafka/streams/kstream/MaterializedXTest.java @@ -26,7 +26,6 @@ import static com.bakdata.kafka.KafkaTest.POLL_TIMEOUT; import static com.bakdata.kafka.KafkaTest.SESSION_TIMEOUT; -import static java.util.Collections.emptyMap; import static java.util.concurrent.CompletableFuture.runAsync; import com.bakdata.fluent_kafka_streams_tests.TestTopology; @@ -550,7 +549,10 @@ public void buildTopology(final StreamsBuilderX builder) { @Test void shouldThrowIfRetentionIsTooShort() { - final StreamsBuilderX builder = new StreamsBuilderX(StreamsTopicConfig.builder().build(), emptyMap()); + final StreamsBuilderX builder = new StreamsBuilderX(StreamsTopicConfig.builder().build(), Map.of( + "application.id", "app-id", + "bootstrap.servers", "localhost:9092" + )); final KStreamX input = builder.stream("input"); final KGroupedStreamX grouped = input.groupByKey(); final TimeWindowedKStreamX windowed = From fd87563f367e011d28f57c7de154a0c766718924 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 27 Oct 2025 13:53:26 +0100 Subject: [PATCH 11/13] Configure via property --- .../streams/kstream/StreamsBuilderX.java | 52 +++++++++---------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/StreamsBuilderX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/StreamsBuilderX.java index 7d1bd13d6..1c4a9269a 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/StreamsBuilderX.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/StreamsBuilderX.java @@ -76,16 +76,16 @@ public StreamsBuilderX(final StreamsTopicConfig topics, final Map KStreamX stream(final String topic) { - final KStreamX wrap = this.getContext().wrap(this.streamsBuilder.stream(topic)); - return createStream(wrap); + final KStreamX stream = this.getContext().wrap(this.streamsBuilder.stream(topic)); + return initialize(stream); } /** * @see StreamsBuilder#stream(String, Consumed) */ public KStreamX stream(final String topic, final Consumed consumed) { - final KStreamX wrap = this.getContext().wrap(this.streamsBuilder.stream(topic, consumed)); - return createStream(wrap); + final KStreamX stream = this.getContext().wrap(this.streamsBuilder.stream(topic, consumed)); + return initialize(stream); } /** @@ -99,16 +99,16 @@ public KStreamX stream(final String topic, final ConsumedX co * @see StreamsBuilder#stream(Collection) */ public KStreamX stream(final Collection topics) { - final KStreamX wrap = this.getContext().wrap(this.streamsBuilder.stream(topics)); - return createStream(wrap); + final KStreamX stream = this.getContext().wrap(this.streamsBuilder.stream(topics)); + return initialize(stream); } /** * @see StreamsBuilder#stream(Collection, Consumed) */ public KStreamX stream(final Collection topics, final Consumed consumed) { - final KStreamX wrap = this.getContext().wrap(this.streamsBuilder.stream(topics, consumed)); - return createStream(wrap); + final KStreamX stream = this.getContext().wrap(this.streamsBuilder.stream(topics, consumed)); + return initialize(stream); } /** @@ -123,16 +123,16 @@ public KStreamX stream(final Collection topics, * @see StreamsBuilder#stream(Pattern) */ public KStreamX stream(final Pattern topicPattern) { - final KStreamX wrap = this.getContext().wrap(this.streamsBuilder.stream(topicPattern)); - return createStream(wrap); + final KStreamX stream = this.getContext().wrap(this.streamsBuilder.stream(topicPattern)); + return initialize(stream); } /** * @see StreamsBuilder#stream(Pattern, Consumed) */ public KStreamX stream(final Pattern topicPattern, final Consumed consumed) { - final KStreamX wrap = this.getContext().wrap(this.streamsBuilder.stream(topicPattern, consumed)); - return createStream(wrap); + final KStreamX stream = this.getContext().wrap(this.streamsBuilder.stream(topicPattern, consumed)); + return initialize(stream); } /** @@ -305,16 +305,16 @@ public KStreamX streamInputPattern(final String label) { * @see StreamsBuilder#table(String) */ public KTableX table(final String topic) { - final KTableX wrap = this.getContext().wrap(this.streamsBuilder.table(topic)); - return createTable(wrap); + final KTableX table = this.getContext().wrap(this.streamsBuilder.table(topic)); + return initialize(table); } /** * @see StreamsBuilder#table(String, Consumed) */ public KTableX table(final String topic, final Consumed consumed) { - final KTableX wrap = this.getContext().wrap(this.streamsBuilder.table(topic, consumed)); - return createTable(wrap); + final KTableX table = this.getContext().wrap(this.streamsBuilder.table(topic, consumed)); + return initialize(table); } /** @@ -329,8 +329,8 @@ public KTableX table(final String topic, final ConsumedX cons */ public KTableX table(final String topic, final Materialized> materialized) { - final KTableX wrap = this.getContext().wrap(this.streamsBuilder.table(topic, materialized)); - return createTable(wrap); + final KTableX table = this.getContext().wrap(this.streamsBuilder.table(topic, materialized)); + return initialize(table); } /** @@ -346,8 +346,8 @@ public KTableX table(final String topic, */ public KTableX table(final String topic, final Consumed consumed, final Materialized> materialized) { - final KTableX wrap = this.getContext().wrap(this.streamsBuilder.table(topic, consumed, materialized)); - return createTable(wrap); + final KTableX table = this.getContext().wrap(this.streamsBuilder.table(topic, consumed, materialized)); + return initialize(table); } /** @@ -486,19 +486,19 @@ public Topology build() { return this.streamsBuilder.build(); } - private KStreamX createStream(final KStreamX wrap) { + private KStreamX initialize(final KStreamX stream) { final StreamsConfigX streamsConfigX = new StreamsConfigX(new StreamsConfig(this.kafkaProperties)); if (streamsConfigX.isLineageEnabled()) { - return wrap.processValues(LineageProcessor::new); + return stream.processValues(LineageProcessor::new); } - return wrap; + return stream; } - private KTableX createTable(final KTableX wrap) { + private KTableX initialize(final KTableX table) { final StreamsConfigX streamsConfigX = new StreamsConfigX(new StreamsConfig(this.kafkaProperties)); if (streamsConfigX.isLineageEnabled()) { - return wrap.transformValues(LineageTransformer::new); + return table.transformValues(LineageTransformer::new); } - return wrap; + return table; } } From 0fb5853f5c586c30fda20ef410243c886cfe4bb1 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 27 Oct 2025 13:55:54 +0100 Subject: [PATCH 12/13] Sonar --- .../main/java/com/bakdata/kafka/streams/StreamsConfigX.java | 5 +++++ .../com/bakdata/kafka/streams/kstream/LineageProcessor.java | 2 ++ .../bakdata/kafka/streams/kstream/LineageTransformer.java | 2 ++ 3 files changed, 9 insertions(+) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsConfigX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsConfigX.java index 645305fa0..6308f32d3 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsConfigX.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsConfigX.java @@ -56,6 +56,11 @@ public class StreamsConfigX extends AbstractConfig { @NonNull StreamsConfig streamsConfig; + /** + * Create a new {@code StreamsConfigX} from a {@link StreamsConfig} + * + * @param streamsConfig streams config + */ public StreamsConfigX(final StreamsConfig streamsConfig) { super(CONFIG_DEF, streamsConfig.originals()); this.streamsConfig = streamsConfig; diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageProcessor.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageProcessor.java index 6a57196c9..6e5f1950b 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageProcessor.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageProcessor.java @@ -25,6 +25,7 @@ package com.bakdata.kafka.streams.kstream; import java.util.Optional; +import lombok.NoArgsConstructor; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.streams.processor.api.FixedKeyProcessor; @@ -32,6 +33,7 @@ import org.apache.kafka.streams.processor.api.FixedKeyRecord; import org.apache.kafka.streams.processor.api.RecordMetadata; +@NoArgsConstructor class LineageProcessor implements FixedKeyProcessor { private FixedKeyProcessorContext context; diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageTransformer.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageTransformer.java index 5f15b736d..184218b53 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageTransformer.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/kstream/LineageTransformer.java @@ -24,9 +24,11 @@ package com.bakdata.kafka.streams.kstream; +import lombok.NoArgsConstructor; import org.apache.kafka.streams.kstream.ValueTransformerWithKey; import org.apache.kafka.streams.processor.ProcessorContext; +@NoArgsConstructor class LineageTransformer implements ValueTransformerWithKey { private ProcessorContext context; From 412decfd57df39b27f2f23dc74bd2a9f42d5f2e2 Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Mon, 27 Oct 2025 13:57:22 +0100 Subject: [PATCH 13/13] Sonar --- .../main/java/com/bakdata/kafka/streams/StreamsConfigX.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsConfigX.java b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsConfigX.java index 6308f32d3..70648cc43 100644 --- a/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsConfigX.java +++ b/streams-bootstrap-core/src/main/java/com/bakdata/kafka/streams/StreamsConfigX.java @@ -91,6 +91,11 @@ public List getBoostrapServers() { return this.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG); } + /** + * Check if adding lineage headers is enabled. This is controlled by {@link #LINEAGE_ENABLED_CONFIG} + * + * @return true if lineage headers are added to streams and tables + */ public boolean isLineageEnabled() { return this.getBoolean(LINEAGE_ENABLED_CONFIG); }