Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -30,45 +30,84 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
import lombok.ToString;
import lombok.Value;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.state.HostInfo;

/**
* Class for simplified access to configs provided by {@link StreamsConfig}
*/
@Value
public class StreamsConfigX {
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class StreamsConfigX extends AbstractConfig {

public static final String LINEAGE_ENABLED_CONFIG = "streams.bootstrap.lineage.enabled";
private static final String LINEAGE_ENABLED_DOC = "";
private static final ConfigDef CONFIG_DEF = StreamsConfig.configDef()
.define(LINEAGE_ENABLED_CONFIG, Type.BOOLEAN, false, Importance.LOW, LINEAGE_ENABLED_DOC);
@NonNull
StreamsConfig streamsConfig;

/**
* Create a new {@code StreamsConfigX} from a {@link StreamsConfig}
*
* @param streamsConfig streams config
*/
public StreamsConfigX(final StreamsConfig streamsConfig) {
super(CONFIG_DEF, streamsConfig.originals());
this.streamsConfig = streamsConfig;
}

private static HostInfo createHostInfo(final String applicationServerConfig) {
final String[] hostAndPort = applicationServerConfig.split(":");
return new HostInfo(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
}

/**
* Get the application id of the underlying {@link StreamsConfig}
*
* @return application id
* @see StreamsConfig#APPLICATION_ID_CONFIG
*/
public String getAppId() {
return this.streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG);
return this.getString(StreamsConfig.APPLICATION_ID_CONFIG);
}

/**
* Get the bootstrap servers of the underlying {@link StreamsConfig}
*
* @return list of bootstrap servers
* @see StreamsConfig#BOOTSTRAP_SERVERS_CONFIG
*/
public List<String> getBoostrapServers() {
return this.streamsConfig.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
return this.getList(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
}

/**
* Check if adding lineage headers is enabled. This is controlled by {@link #LINEAGE_ENABLED_CONFIG}
*
* @return true if lineage headers are added to streams and tables
*/
public boolean isLineageEnabled() {
return this.getBoolean(LINEAGE_ENABLED_CONFIG);
}

/**
* Get all configs of the underlying {@link StreamsConfig}
*
* @return Kafka configs
* @see StreamsConfig#originals()
*/
public Map<String, Object> getKafkaProperties() {
return Collections.unmodifiableMap(this.streamsConfig.originals());
return Collections.unmodifiableMap(this.originals());
}

/**
Expand All @@ -78,13 +117,8 @@ public Map<String, Object> getKafkaProperties() {
* {@link StreamsConfig#APPLICATION_SERVER_CONFIG} is set; otherwise, an empty {@link Optional}.
*/
public Optional<HostInfo> getApplicationServer() {
final String applicationServerConfig = this.streamsConfig.getString(APPLICATION_SERVER_CONFIG);
final String applicationServerConfig = this.getString(APPLICATION_SERVER_CONFIG);
return applicationServerConfig.isEmpty() ? Optional.empty()
: Optional.of(createHostInfo(applicationServerConfig));
}

private static HostInfo createHostInfo(final String applicationServerConfig) {
final String[] hostAndPort = applicationServerConfig.split(":");
return new HostInfo(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka.streams.kstream;

import java.nio.charset.StandardCharsets;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.experimental.Accessors;
import org.apache.kafka.common.header.Headers;

/**
* Configure headers for data lineage of Kafka messages
*/
@RequiredArgsConstructor(access = AccessLevel.PACKAGE)
public class LineageHeaders {
private static final String LINEAGE_PREFIX = "streams.bootstrap.lineage.";
/**
* Header indicating the topic the record was read from.
*/
public static final String TOPIC_HEADER = LINEAGE_PREFIX + "topic";
/**
* Header indicating the partition the record was read from.
*/
public static final String PARTITION_HEADER = LINEAGE_PREFIX + "partition";
/**
* Header indicating the offset the record was read from.
*/
public static final String OFFSET_HEADER = LINEAGE_PREFIX + "offset";

@Getter(AccessLevel.PACKAGE)
@Accessors(fluent = true)
private final @NonNull Headers headers;

LineageHeaders addTopicHeader(final String topic) {
if (topic == null) {
return this;
}
return new LineageHeaders(this.headers.add(TOPIC_HEADER, topic.getBytes(StandardCharsets.UTF_8)));
}

LineageHeaders addPartitionHeader(final int partition) {
if (partition < 0) {
return this;
}
//TODO serialize more compact as int? But then UI tools usually can't handle it
return new LineageHeaders(
this.headers.add(PARTITION_HEADER, Integer.toString(partition).getBytes(StandardCharsets.UTF_8)));
}

LineageHeaders addOffsetHeader(final long offset) {
if (offset < 0) {
return this;
}
//TODO serialize more compact as long? But then UI tools usually can't handle it
return new LineageHeaders(
this.headers.add(OFFSET_HEADER, Long.toString(offset).getBytes(StandardCharsets.UTF_8)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka.streams.kstream;

import java.util.Optional;
import lombok.NoArgsConstructor;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
import org.apache.kafka.streams.processor.api.FixedKeyRecord;
import org.apache.kafka.streams.processor.api.RecordMetadata;

@NoArgsConstructor
class LineageProcessor<K, V> implements FixedKeyProcessor<K, V, V> {
private FixedKeyProcessorContext<K, V> context;

private static Headers addHeaders(final Headers headers, final RecordMetadata metadata) {
return new LineageHeaders(new RecordHeaders(headers))
.addTopicHeader(metadata.topic())
.addPartitionHeader(metadata.partition())
.addOffsetHeader(metadata.offset())
.headers();
}

@Override
public void init(final FixedKeyProcessorContext<K, V> context) {
this.context = context;
}

@Override
public void process(final FixedKeyRecord<K, V> rekord) {
final Optional<RecordMetadata> metadata = this.context.recordMetadata();
final Headers headers = rekord.headers();
final Headers newHeaders = metadata.map(m -> addHeaders(headers, m))
.orElse(headers);
this.context.forward(rekord.withHeaders(newHeaders));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka.streams.kstream;

import lombok.NoArgsConstructor;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.processor.ProcessorContext;

@NoArgsConstructor
class LineageTransformer<K, V> implements ValueTransformerWithKey<K, V, V> {

private ProcessorContext context;

@Override
public void init(final ProcessorContext context) {
this.context = context;
}

@Override
public V transform(final K readOnlyKey, final V value) {
new LineageHeaders(this.context.headers())
.addTopicHeader(this.context.topic())
.addPartitionHeader(this.context.partition())
.addOffsetHeader(this.context.offset());
return value;
}

@Override
public void close() {
// do nothing
}
}
Loading
Loading