Skip to content

Commit 6d3dbe7

Browse files
committed
[FLINK-32743][Connectors/Kafka] Parse data from kafka connect and convert it into regular JSON data
1 parent 79ae2d7 commit 6d3dbe7

File tree

11 files changed

+246
-11
lines changed

11 files changed

+246
-11
lines changed

flink-connector-kafka/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,16 @@ under the License.
7272
<version>${kafka.version}</version>
7373
</dependency>
7474

75+
7576
<!-- Tests -->
7677

78+
<dependency>
79+
<groupId>org.apache.kafka</groupId>
80+
<artifactId>connect-json</artifactId>
81+
<version>${kafka.version}</version>
82+
<scope>test</scope>
83+
</dependency>
84+
7785
<dependency>
7886
<groupId>org.hamcrest</groupId>
7987
<artifactId>hamcrest-all</artifactId>

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,15 @@ public KafkaSourceBuilder<OUT> setValueOnlyDeserializer(
345345
return this;
346346
}
347347

348+
public KafkaSourceBuilder<OUT> setValueOnlyDeserializer(
349+
DeserializationSchema<OUT> deserializationSchema,
350+
boolean valueIncludeKafkaConnectJsonSchema) {
351+
this.deserializationSchema =
352+
KafkaRecordDeserializationSchema.valueOnly(
353+
deserializationSchema, valueIncludeKafkaConnectJsonSchema);
354+
return this;
355+
}
356+
348357
/**
349358
* Sets the client id prefix of this KafkaSource.
350359
*

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,13 @@ static <V> KafkaRecordDeserializationSchema<V> valueOnly(
9696
return new KafkaValueOnlyDeserializationSchemaWrapper<>(valueDeserializationSchema);
9797
}
9898

99+
static <V> KafkaRecordDeserializationSchema<V> valueOnly(
100+
DeserializationSchema<V> valueDeserializationSchema,
101+
boolean valueIncludeKafkaConnectJsonSchema) {
102+
return new KafkaValueOnlyDeserializationSchemaWrapper<>(
103+
valueDeserializationSchema, valueIncludeKafkaConnectJsonSchema);
104+
}
105+
99106
/**
100107
* Wraps a Kafka {@link Deserializer} to a {@link KafkaRecordDeserializationSchema}.
101108
*

flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializationSchemaWrapper.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.api.common.serialization.DeserializationSchema;
2222
import org.apache.flink.api.common.typeinfo.TypeInformation;
23+
import org.apache.flink.connector.kafka.source.util.ExtractPayloadSourceRecordUtil;
2324
import org.apache.flink.util.Collector;
2425

2526
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -35,9 +36,17 @@
3536
class KafkaValueOnlyDeserializationSchemaWrapper<T> implements KafkaRecordDeserializationSchema<T> {
3637
private static final long serialVersionUID = 1L;
3738
private final DeserializationSchema<T> deserializationSchema;
39+
private final boolean valueIncludeKafkaConnectJsonSchema;
3840

3941
KafkaValueOnlyDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) {
42+
this(deserializationSchema, false);
43+
}
44+
45+
KafkaValueOnlyDeserializationSchemaWrapper(
46+
DeserializationSchema<T> deserializationSchema,
47+
boolean valueIncludeKafkaConnectJsonSchema) {
4048
this.deserializationSchema = deserializationSchema;
49+
this.valueIncludeKafkaConnectJsonSchema = valueIncludeKafkaConnectJsonSchema;
4150
}
4251

4352
@Override
@@ -48,7 +57,10 @@ public void open(DeserializationSchema.InitializationContext context) throws Exc
4857
@Override
4958
public void deserialize(ConsumerRecord<byte[], byte[]> message, Collector<T> out)
5059
throws IOException {
51-
deserializationSchema.deserialize(message.value(), out);
60+
byte[] extractValue =
61+
ExtractPayloadSourceRecordUtil.extractPayloadIfIncludeConnectSchema(
62+
message.value(), valueIncludeKafkaConnectJsonSchema);
63+
deserializationSchema.deserialize(extractValue, out);
5264
}
5365

5466
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.kafka.source.util;
20+
21+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
22+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
23+
24+
import java.io.IOException;
25+
26+
/** Extract payload from kafka connect SourceRecord,filter out the schema. */
27+
public class ExtractPayloadSourceRecordUtil {
28+
29+
private static final String RECORD_PAYLOAD_FIELD = "payload";
30+
private static final ObjectMapper objectMapper = new ObjectMapper();
31+
32+
public static byte[] extractPayloadIfIncludeConnectSchema(byte[] message, boolean includeSchema)
33+
throws IOException {
34+
if (includeSchema) {
35+
JsonNode jsonNode = deserializeToJsonNode(message);
36+
return objectMapper.writeValueAsBytes(jsonNode.get(RECORD_PAYLOAD_FIELD));
37+
}
38+
return message;
39+
}
40+
41+
private static JsonNode deserializeToJsonNode(byte[] message) throws IOException {
42+
return objectMapper.readTree(message);
43+
}
44+
}

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.api.common.serialization.DeserializationSchema;
2222
import org.apache.flink.api.common.typeinfo.TypeInformation;
23+
import org.apache.flink.connector.kafka.source.util.ExtractPayloadSourceRecordUtil;
2324
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
2425
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
2526
import org.apache.flink.table.data.GenericRowData;
@@ -56,6 +57,9 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
5657

5758
private final boolean upsertMode;
5859

60+
private final boolean keyIncludeKafkaConnectJsonSchema;
61+
private final boolean valueIncludeKafkaConnectJsonSchema;
62+
5963
DynamicKafkaDeserializationSchema(
6064
int physicalArity,
6165
@Nullable DeserializationSchema<RowData> keyDeserialization,
@@ -65,7 +69,9 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
6569
boolean hasMetadata,
6670
MetadataConverter[] metadataConverters,
6771
TypeInformation<RowData> producedTypeInfo,
68-
boolean upsertMode) {
72+
boolean upsertMode,
73+
boolean keyIncludeKafkaConnectJsonSchema,
74+
boolean valueIncludeKafkaConnectJsonSchema) {
6975
if (upsertMode) {
7076
Preconditions.checkArgument(
7177
keyDeserialization != null && keyProjection.length > 0,
@@ -84,6 +90,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema<Ro
8490
upsertMode);
8591
this.producedTypeInfo = producedTypeInfo;
8692
this.upsertMode = upsertMode;
93+
this.keyIncludeKafkaConnectJsonSchema = keyIncludeKafkaConnectJsonSchema;
94+
this.valueIncludeKafkaConnectJsonSchema = valueIncludeKafkaConnectJsonSchema;
8795
}
8896

8997
@Override
@@ -110,13 +118,19 @@ public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData
110118
// shortcut in case no output projection is required,
111119
// also not for a cartesian product with the keys
112120
if (keyDeserialization == null && !hasMetadata) {
113-
valueDeserialization.deserialize(record.value(), collector);
121+
byte[] extractRecordValue =
122+
ExtractPayloadSourceRecordUtil.extractPayloadIfIncludeConnectSchema(
123+
record.value(), valueIncludeKafkaConnectJsonSchema);
124+
valueDeserialization.deserialize(extractRecordValue, collector);
114125
return;
115126
}
116127

117128
// buffer key(s)
118129
if (keyDeserialization != null) {
119-
keyDeserialization.deserialize(record.key(), keyCollector);
130+
byte[] extractRecordKey =
131+
ExtractPayloadSourceRecordUtil.extractPayloadIfIncludeConnectSchema(
132+
record.key(), keyIncludeKafkaConnectJsonSchema);
133+
keyDeserialization.deserialize(extractRecordKey, keyCollector);
120134
}
121135

122136
// project output while emitting values
@@ -127,7 +141,10 @@ public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData
127141
// collect tombstone messages in upsert mode by hand
128142
outputCollector.collect(null);
129143
} else {
130-
valueDeserialization.deserialize(record.value(), outputCollector);
144+
byte[] extractRecordValue =
145+
ExtractPayloadSourceRecordUtil.extractPayloadIfIncludeConnectSchema(
146+
record.value(), valueIncludeKafkaConnectJsonSchema);
147+
valueDeserialization.deserialize(extractRecordValue, outputCollector);
131148
}
132149
keyCollector.buffer.clear();
133150
}

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,20 @@ public class KafkaConnectorOptions {
194194
+ "The value 0 disables the partition discovery."
195195
+ "The default value is 5 minutes, which is equal to the default value of metadata.max.age.ms in Kafka.");
196196

197+
public static final ConfigOption<Boolean> RECORD_KEY_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA =
198+
ConfigOptions.key("record.key.include.kafka.connect.json.schema")
199+
.booleanType()
200+
.defaultValue(false)
201+
.withDescription(
202+
"The record key from Kafka contains the JSON schema of Kafka connect. If it defaults to false, it does not need to be parsed. If it is configured to true, it needs to be parsed.");
203+
204+
public static final ConfigOption<Boolean> RECORD_VALUE_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA =
205+
ConfigOptions.key("record.value.include.kafka.connect.json.schema")
206+
.booleanType()
207+
.defaultValue(false)
208+
.withDescription(
209+
"The record value from Kafka contains the JSON schema of Kafka connect. If it defaults to false, it does not need to be parsed. If it is configured to true, it needs to be parsed.");
210+
197211
// --------------------------------------------------------------------------------------------
198212
// Sink specific options
199213
// --------------------------------------------------------------------------------------------

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,9 @@ public class KafkaDynamicSource
171171

172172
protected final String tableIdentifier;
173173

174+
protected final boolean keyIncludeKafkaConnectJsonSchema;
175+
protected final boolean valueIncludeKafkaConnectJsonSchema;
176+
174177
public KafkaDynamicSource(
175178
DataType physicalDataType,
176179
@Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
@@ -189,6 +192,48 @@ public KafkaDynamicSource(
189192
long boundedTimestampMillis,
190193
boolean upsertMode,
191194
String tableIdentifier) {
195+
this(
196+
physicalDataType,
197+
keyDecodingFormat,
198+
valueDecodingFormat,
199+
keyProjection,
200+
valueProjection,
201+
keyPrefix,
202+
topics,
203+
topicPattern,
204+
properties,
205+
startupMode,
206+
specificStartupOffsets,
207+
startupTimestampMillis,
208+
boundedMode,
209+
specificBoundedOffsets,
210+
boundedTimestampMillis,
211+
upsertMode,
212+
tableIdentifier,
213+
false,
214+
false);
215+
}
216+
217+
public KafkaDynamicSource(
218+
DataType physicalDataType,
219+
@Nullable DecodingFormat<DeserializationSchema<RowData>> keyDecodingFormat,
220+
DecodingFormat<DeserializationSchema<RowData>> valueDecodingFormat,
221+
int[] keyProjection,
222+
int[] valueProjection,
223+
@Nullable String keyPrefix,
224+
@Nullable List<String> topics,
225+
@Nullable Pattern topicPattern,
226+
Properties properties,
227+
StartupMode startupMode,
228+
Map<KafkaTopicPartition, Long> specificStartupOffsets,
229+
long startupTimestampMillis,
230+
BoundedMode boundedMode,
231+
Map<KafkaTopicPartition, Long> specificBoundedOffsets,
232+
long boundedTimestampMillis,
233+
boolean upsertMode,
234+
String tableIdentifier,
235+
boolean keyIncludeKafkaConnectJsonSchema,
236+
boolean valueIncludeKafkaConnectJsonSchema) {
192237
// Format attributes
193238
this.physicalDataType =
194239
Preconditions.checkNotNull(
@@ -228,6 +273,8 @@ public KafkaDynamicSource(
228273
this.boundedTimestampMillis = boundedTimestampMillis;
229274
this.upsertMode = upsertMode;
230275
this.tableIdentifier = tableIdentifier;
276+
this.keyIncludeKafkaConnectJsonSchema = keyIncludeKafkaConnectJsonSchema;
277+
this.valueIncludeKafkaConnectJsonSchema = valueIncludeKafkaConnectJsonSchema;
231278
}
232279

233280
@Override
@@ -344,7 +391,9 @@ public DynamicTableSource copy() {
344391
specificBoundedOffsets,
345392
boundedTimestampMillis,
346393
upsertMode,
347-
tableIdentifier);
394+
tableIdentifier,
395+
keyIncludeKafkaConnectJsonSchema,
396+
valueIncludeKafkaConnectJsonSchema);
348397
copy.producedDataType = producedDataType;
349398
copy.metadataKeys = metadataKeys;
350399
copy.watermarkStrategy = watermarkStrategy;
@@ -384,7 +433,9 @@ public boolean equals(Object o) {
384433
&& boundedTimestampMillis == that.boundedTimestampMillis
385434
&& Objects.equals(upsertMode, that.upsertMode)
386435
&& Objects.equals(tableIdentifier, that.tableIdentifier)
387-
&& Objects.equals(watermarkStrategy, that.watermarkStrategy);
436+
&& Objects.equals(watermarkStrategy, that.watermarkStrategy)
437+
&& keyIncludeKafkaConnectJsonSchema == that.keyIncludeKafkaConnectJsonSchema
438+
&& valueIncludeKafkaConnectJsonSchema == that.valueIncludeKafkaConnectJsonSchema;
388439
}
389440

390441
@Override
@@ -550,7 +601,9 @@ private KafkaDeserializationSchema<RowData> createKafkaDeserializationSchema(
550601
hasMetadata,
551602
metadataConverters,
552603
producedTypeInfo,
553-
upsertMode);
604+
upsertMode,
605+
keyIncludeKafkaConnectJsonSchema,
606+
valueIncludeKafkaConnectJsonSchema);
554607
}
555608

556609
private @Nullable DeserializationSchema<RowData> createDeserialization(

flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@
7171
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT;
7272
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS;
7373
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_GROUP_ID;
74+
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.RECORD_KEY_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA;
75+
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.RECORD_VALUE_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA;
7476
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE;
7577
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS;
7678
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS;
@@ -152,6 +154,8 @@ public Set<ConfigOption<?>> optionalOptions() {
152154
options.add(SCAN_BOUNDED_MODE);
153155
options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS);
154156
options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS);
157+
options.add(RECORD_KEY_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA);
158+
options.add(RECORD_VALUE_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA);
155159
return options;
156160
}
157161

@@ -215,6 +219,16 @@ public DynamicTableSource createDynamicTableSource(Context context) {
215219

216220
final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null);
217221

222+
final boolean keyIncludeKafkaConnectJsonSchema =
223+
tableOptions
224+
.getOptional(RECORD_KEY_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA)
225+
.orElse(false);
226+
227+
final boolean valueIncludeKafkaConnectJsonSchema =
228+
tableOptions
229+
.getOptional(RECORD_VALUE_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA)
230+
.orElse(false);
231+
218232
return createKafkaTableSource(
219233
physicalDataType,
220234
keyDecodingFormat.orElse(null),
@@ -231,7 +245,9 @@ public DynamicTableSource createDynamicTableSource(Context context) {
231245
boundedOptions.boundedMode,
232246
boundedOptions.specificOffsets,
233247
boundedOptions.boundedTimestampMillis,
234-
context.getObjectIdentifier().asSummaryString());
248+
context.getObjectIdentifier().asSummaryString(),
249+
keyIncludeKafkaConnectJsonSchema,
250+
valueIncludeKafkaConnectJsonSchema);
235251
}
236252

237253
@Override
@@ -395,7 +411,9 @@ protected KafkaDynamicSource createKafkaTableSource(
395411
BoundedMode boundedMode,
396412
Map<KafkaTopicPartition, Long> specificEndOffsets,
397413
long endTimestampMillis,
398-
String tableIdentifier) {
414+
String tableIdentifier,
415+
boolean keyIncludeKafkaConnectJsonSchema,
416+
boolean valueIncludeKafkaConnectJsonSchema) {
399417
return new KafkaDynamicSource(
400418
physicalDataType,
401419
keyDecodingFormat,
@@ -413,7 +431,9 @@ protected KafkaDynamicSource createKafkaTableSource(
413431
specificEndOffsets,
414432
endTimestampMillis,
415433
false,
416-
tableIdentifier);
434+
tableIdentifier,
435+
keyIncludeKafkaConnectJsonSchema,
436+
valueIncludeKafkaConnectJsonSchema);
417437
}
418438

419439
protected KafkaDynamicSink createKafkaTableSink(

0 commit comments

Comments
 (0)