Skip to content

Commit

Permalink
[FLINK-32743][Connectors/Kafka] Parse data from kafka connect and con…
Browse files Browse the repository at this point in the history
…vert it into regular JSON data
  • Loading branch information
sunxiaojian committed Jan 26, 2024
1 parent 8d53189 commit cf522ef
Show file tree
Hide file tree
Showing 11 changed files with 246 additions and 15 deletions.
8 changes: 7 additions & 1 deletion flink-connector-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,15 @@ under the License.
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<!-- Tests -->

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,15 @@ public KafkaSourceBuilder<OUT> setValueOnlyDeserializer(
return this;
}

public KafkaSourceBuilder<OUT> setValueOnlyDeserializer(
DeserializationSchema<OUT> deserializationSchema,
boolean valueIncludeKafkaConnectJsonSchema) {
this.deserializationSchema =
KafkaRecordDeserializationSchema.valueOnly(
deserializationSchema, valueIncludeKafkaConnectJsonSchema);
return this;
}

/**
* Sets the client id prefix of this KafkaSource.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ static <V> KafkaRecordDeserializationSchema<V> valueOnly(
return new KafkaValueOnlyDeserializationSchemaWrapper<>(valueDeserializationSchema);
}

static <V> KafkaRecordDeserializationSchema<V> valueOnly(
DeserializationSchema<V> valueDeserializationSchema,
boolean valueIncludeKafkaConnectJsonSchema) {
return new KafkaValueOnlyDeserializationSchemaWrapper<>(
valueDeserializationSchema, valueIncludeKafkaConnectJsonSchema);
}

/**
* Wraps a Kafka {@link Deserializer} to a {@link KafkaRecordDeserializationSchema}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.util.ExtractPayloadSourceRecordUtil;
import org.apache.flink.util.Collector;

import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand All @@ -35,9 +36,17 @@
class KafkaValueOnlyDeserializationSchemaWrapper<T> implements KafkaRecordDeserializationSchema<T> {
private static final long serialVersionUID = 1L;
private final DeserializationSchema<T> deserializationSchema;
private final boolean valueIncludeKafkaConnectJsonSchema;

KafkaValueOnlyDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) {
this(deserializationSchema, false);
}

KafkaValueOnlyDeserializationSchemaWrapper(
DeserializationSchema<T> deserializationSchema,
boolean valueIncludeKafkaConnectJsonSchema) {
this.deserializationSchema = deserializationSchema;
this.valueIncludeKafkaConnectJsonSchema = valueIncludeKafkaConnectJsonSchema;
}

@Override
Expand All @@ -48,7 +57,10 @@ public void open(DeserializationSchema.InitializationContext context) throws Exc
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out)
throws IOException {
deserializationSchema.deserialize(message.value(), out);
byte[] extractValue =
ExtractPayloadSourceRecordUtil.extractPayloadIfIncludeConnectSchema(
message.value(), valueIncludeKafkaConnectJsonSchema);
deserializationSchema.deserialize(extractValue, out);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.kafka.source.util;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import java.io.IOException;

/** Extract payload from kafka connect SourceRecord,filter out the schema. */
public class ExtractPayloadSourceRecordUtil {

private static final String RECORD_PAYLOAD_FIELD = "payload";
private static final ObjectMapper objectMapper = new ObjectMapper();

public static byte[] extractPayloadIfIncludeConnectSchema(byte[] message, boolean includeSchema)
throws IOException {
if (includeSchema) {
JsonNode jsonNode = deserializeToJsonNode(message);
return objectMapper.writeValueAsBytes(jsonNode.get(RECORD_PAYLOAD_FIELD));
}
return message;
}

private static JsonNode deserializeToJsonNode(byte[] message) throws IOException {
return objectMapper.readTree(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.util.ExtractPayloadSourceRecordUtil;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.table.data.GenericRowData;
Expand Down Expand Up @@ -56,6 +57,9 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro

private final boolean upsertMode;

private final boolean keyIncludeKafkaConnectJsonSchema;
private final boolean valueIncludeKafkaConnectJsonSchema;

DynamicKafkaDeserializationSchema(
int physicalArity,
@Nullable DeserializationSchema<RowData> keyDeserialization,
Expand All @@ -65,7 +69,9 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
boolean hasMetadata,
MetadataConverter[] metadataConverters,
TypeInformation<RowData> producedTypeInfo,
boolean upsertMode) {
boolean upsertMode,
boolean keyIncludeKafkaConnectJsonSchema,
boolean valueIncludeKafkaConnectJsonSchema) {
if (upsertMode) {
Preconditions.checkArgument(
keyDeserialization != null && keyProjection.length > 0,
Expand All @@ -84,6 +90,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
upsertMode);
this.producedTypeInfo = producedTypeInfo;
this.upsertMode = upsertMode;
this.keyIncludeKafkaConnectJsonSchema = keyIncludeKafkaConnectJsonSchema;
this.valueIncludeKafkaConnectJsonSchema = valueIncludeKafkaConnectJsonSchema;
}

@Override
Expand All @@ -110,13 +118,19 @@ public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData
// shortcut in case no output projection is required,
// also not for a cartesian product with the keys
if (keyDeserialization == null && !hasMetadata) {
valueDeserialization.deserialize(record.value(), collector);
byte[] extractRecordValue =
ExtractPayloadSourceRecordUtil.extractPayloadIfIncludeConnectSchema(
record.value(), valueIncludeKafkaConnectJsonSchema);
valueDeserialization.deserialize(extractRecordValue, collector);
return;
}

// buffer key(s)
if (keyDeserialization != null) {
keyDeserialization.deserialize(record.key(), keyCollector);
byte[] extractRecordKey =
ExtractPayloadSourceRecordUtil.extractPayloadIfIncludeConnectSchema(
record.key(), keyIncludeKafkaConnectJsonSchema);
keyDeserialization.deserialize(extractRecordKey, keyCollector);
}

// project output while emitting values
Expand All @@ -127,7 +141,10 @@ public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData
// collect tombstone messages in upsert mode by hand
outputCollector.collect(null);
} else {
valueDeserialization.deserialize(record.value(), outputCollector);
byte[] extractRecordValue =
ExtractPayloadSourceRecordUtil.extractPayloadIfIncludeConnectSchema(
record.value(), valueIncludeKafkaConnectJsonSchema);
valueDeserialization.deserialize(extractRecordValue, outputCollector);
}
keyCollector.buffer.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,20 @@ public class KafkaConnectorOptions {
+ "The value 0 disables the partition discovery."
+ "The default value is 5 minutes, which is equal to the default value of metadata.max.age.ms in Kafka.");

public static final ConfigOption<Boolean> RECORD_KEY_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA =
ConfigOptions.key("record.key.include.kafka.connect.json.schema")
.booleanType()
.defaultValue(false)
.withDescription(
"The record key from Kafka contains the JSON schema of Kafka connect. If it defaults to false, it does not need to be parsed. If it is configured to true, it needs to be parsed.");

public static final ConfigOption<Boolean> RECORD_VALUE_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA =
ConfigOptions.key("record.value.include.kafka.connect.json.schema")
.booleanType()
.defaultValue(false)
.withDescription(
"The record value from Kafka contains the JSON schema of Kafka connect. If it defaults to false, it does not need to be parsed. If it is configured to true, it needs to be parsed.");

// --------------------------------------------------------------------------------------------
// Sink specific options
// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ public class KafkaDynamicSource

protected final String tableIdentifier;

protected final boolean keyIncludeKafkaConnectJsonSchema;
protected final boolean valueIncludeKafkaConnectJsonSchema;

public KafkaDynamicSource(
DataType physicalDataType,
@Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
Expand All @@ -189,6 +192,48 @@ public KafkaDynamicSource(
long boundedTimestampMillis,
boolean upsertMode,
String tableIdentifier) {
this(
physicalDataType,
keyDecodingFormat,
valueDecodingFormat,
keyProjection,
valueProjection,
keyPrefix,
topics,
topicPattern,
properties,
startupMode,
specificStartupOffsets,
startupTimestampMillis,
boundedMode,
specificBoundedOffsets,
boundedTimestampMillis,
upsertMode,
tableIdentifier,
false,
false);
}

public KafkaDynamicSource(
DataType physicalDataType,
@Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
int[] keyProjection,
int[] valueProjection,
@Nullable String keyPrefix,
@Nullable List<String> topics,
@Nullable Pattern topicPattern,
Properties properties,
StartupMode startupMode,
Map<KafkaTopicPartition, Long> specificStartupOffsets,
long startupTimestampMillis,
BoundedMode boundedMode,
Map<KafkaTopicPartition, Long> specificBoundedOffsets,
long boundedTimestampMillis,
boolean upsertMode,
String tableIdentifier,
boolean keyIncludeKafkaConnectJsonSchema,
boolean valueIncludeKafkaConnectJsonSchema) {
// Format attributes
this.physicalDataType =
Preconditions.checkNotNull(
Expand Down Expand Up @@ -228,6 +273,8 @@ public KafkaDynamicSource(
this.boundedTimestampMillis = boundedTimestampMillis;
this.upsertMode = upsertMode;
this.tableIdentifier = tableIdentifier;
this.keyIncludeKafkaConnectJsonSchema = keyIncludeKafkaConnectJsonSchema;
this.valueIncludeKafkaConnectJsonSchema = valueIncludeKafkaConnectJsonSchema;
}

@Override
Expand Down Expand Up @@ -344,7 +391,9 @@ public DynamicTableSource copy() {
specificBoundedOffsets,
boundedTimestampMillis,
upsertMode,
tableIdentifier);
tableIdentifier,
keyIncludeKafkaConnectJsonSchema,
valueIncludeKafkaConnectJsonSchema);
copy.producedDataType = producedDataType;
copy.metadataKeys = metadataKeys;
copy.watermarkStrategy = watermarkStrategy;
Expand Down Expand Up @@ -384,7 +433,9 @@ public boolean equals(Object o) {
&& boundedTimestampMillis == that.boundedTimestampMillis
&& Objects.equals(upsertMode, that.upsertMode)
&& Objects.equals(tableIdentifier, that.tableIdentifier)
&& Objects.equals(watermarkStrategy, that.watermarkStrategy);
&& Objects.equals(watermarkStrategy, that.watermarkStrategy)
&& keyIncludeKafkaConnectJsonSchema == that.keyIncludeKafkaConnectJsonSchema
&& valueIncludeKafkaConnectJsonSchema == that.valueIncludeKafkaConnectJsonSchema;
}

@Override
Expand Down Expand Up @@ -550,7 +601,9 @@ private KafkaDeserializationSchema<RowData> createKafkaDeserializationSchema(
hasMetadata,
metadataConverters,
producedTypeInfo,
upsertMode);
upsertMode,
keyIncludeKafkaConnectJsonSchema,
valueIncludeKafkaConnectJsonSchema);
}

private @Nullable DeserializationSchema<RowData> createDeserialization(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_GROUP_ID;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.RECORD_KEY_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.RECORD_VALUE_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS;
Expand Down Expand Up @@ -152,6 +154,8 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(SCAN_BOUNDED_MODE);
options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS);
options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS);
options.add(RECORD_KEY_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA);
options.add(RECORD_VALUE_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA);
return options;
}

Expand Down Expand Up @@ -215,6 +219,16 @@ public DynamicTableSource createDynamicTableSource(Context context) {

final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);

final boolean keyIncludeKafkaConnectJsonSchema =
tableOptions
.getOptional(RECORD_KEY_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA)
.orElse(false);

final boolean valueIncludeKafkaConnectJsonSchema =
tableOptions
.getOptional(RECORD_VALUE_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA)
.orElse(false);

return createKafkaTableSource(
physicalDataType,
keyDecodingFormat.orElse(null),
Expand All @@ -231,7 +245,9 @@ public DynamicTableSource createDynamicTableSource(Context context) {
boundedOptions.boundedMode,
boundedOptions.specificOffsets,
boundedOptions.boundedTimestampMillis,
context.getObjectIdentifier().asSummaryString());
context.getObjectIdentifier().asSummaryString(),
keyIncludeKafkaConnectJsonSchema,
valueIncludeKafkaConnectJsonSchema);
}

@Override
Expand Down Expand Up @@ -395,7 +411,9 @@ protected KafkaDynamicSource createKafkaTableSource(
BoundedMode boundedMode,
Map<KafkaTopicPartition, Long> specificEndOffsets,
long endTimestampMillis,
String tableIdentifier) {
String tableIdentifier,
boolean keyIncludeKafkaConnectJsonSchema,
boolean valueIncludeKafkaConnectJsonSchema) {
return new KafkaDynamicSource(
physicalDataType,
keyDecodingFormat,
Expand All @@ -413,7 +431,9 @@ protected KafkaDynamicSource createKafkaTableSource(
specificEndOffsets,
endTimestampMillis,
false,
tableIdentifier);
tableIdentifier,
keyIncludeKafkaConnectJsonSchema,
valueIncludeKafkaConnectJsonSchema);
}

protected KafkaDynamicSink createKafkaTableSink(
Expand Down
Loading

0 comments on commit cf522ef

Please sign in to comment.