diff --git a/flink-connector-kafka/pom.xml b/flink-connector-kafka/pom.xml index 40d6a9f3b..421e115c3 100644 --- a/flink-connector-kafka/pom.xml +++ b/flink-connector-kafka/pom.xml @@ -85,9 +85,15 @@ under the License. com.google.guava guava - + + org.apache.kafka + connect-json + ${kafka.version} + test + + org.hamcrest hamcrest-all diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java index dcad476ba..d7f164cc5 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java @@ -350,6 +350,15 @@ public KafkaSourceBuilder setValueOnlyDeserializer( return this; } + public KafkaSourceBuilder setValueOnlyDeserializer( + DeserializationSchema deserializationSchema, + boolean valueIncludeKafkaConnectJsonSchema) { + this.deserializationSchema = + KafkaRecordDeserializationSchema.valueOnly( + deserializationSchema, valueIncludeKafkaConnectJsonSchema); + return this; + } + /** * Sets the client id prefix of this KafkaSource. * diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java index 6ad6607c9..1e691de42 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java @@ -96,6 +96,13 @@ static KafkaRecordDeserializationSchema valueOnly( return new KafkaValueOnlyDeserializationSchemaWrapper<>(valueDeserializationSchema); } + static KafkaRecordDeserializationSchema valueOnly( + DeserializationSchema valueDeserializationSchema, + boolean valueIncludeKafkaConnectJsonSchema) { + return new KafkaValueOnlyDeserializationSchemaWrapper<>( + valueDeserializationSchema, valueIncludeKafkaConnectJsonSchema); + } + /** * Wraps a Kafka {@link Deserializer} to a {@link KafkaRecordDeserializationSchema}. * diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializationSchemaWrapper.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializationSchemaWrapper.java index 209f5e15c..357dcd78f 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializationSchemaWrapper.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaValueOnlyDeserializationSchemaWrapper.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.kafka.source.util.ExtractPayloadSourceRecordUtil; import org.apache.flink.util.Collector; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -35,9 +36,17 @@ class KafkaValueOnlyDeserializationSchemaWrapper implements KafkaRecordDeserializationSchema { private static final long serialVersionUID = 1L; private final DeserializationSchema deserializationSchema; + private final boolean valueIncludeKafkaConnectJsonSchema; KafkaValueOnlyDeserializationSchemaWrapper(DeserializationSchema deserializationSchema) { + this(deserializationSchema, false); + } + + KafkaValueOnlyDeserializationSchemaWrapper( + DeserializationSchema deserializationSchema, + boolean valueIncludeKafkaConnectJsonSchema) { this.deserializationSchema = deserializationSchema; + this.valueIncludeKafkaConnectJsonSchema = valueIncludeKafkaConnectJsonSchema; } @Override @@ -48,7 +57,10 @@ public void open(DeserializationSchema.InitializationContext context) throws Exc @Override public void deserialize(ConsumerRecord message, Collector out) throws IOException { - deserializationSchema.deserialize(message.value(), out); + byte[] extractValue = + ExtractPayloadSourceRecordUtil.extractPayloadIfIncludeConnectSchema( + message.value(), valueIncludeKafkaConnectJsonSchema); + deserializationSchema.deserialize(extractValue, out); } @Override diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/util/ExtractPayloadSourceRecordUtil.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/util/ExtractPayloadSourceRecordUtil.java new file mode 100644 index 000000000..10ad89af5 --- /dev/null +++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/util/ExtractPayloadSourceRecordUtil.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.kafka.source.util; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; + +/** Extract payload from kafka connect SourceRecord,filter out the schema. */ +public class ExtractPayloadSourceRecordUtil { + + private static final String RECORD_PAYLOAD_FIELD = "payload"; + private static final ObjectMapper objectMapper = new ObjectMapper(); + + public static byte[] extractPayloadIfIncludeConnectSchema(byte[] message, boolean includeSchema) + throws IOException { + if (includeSchema) { + JsonNode jsonNode = deserializeToJsonNode(message); + return objectMapper.writeValueAsBytes(jsonNode.get(RECORD_PAYLOAD_FIELD)); + } + return message; + } + + private static JsonNode deserializeToJsonNode(byte[] message) throws IOException { + return objectMapper.readTree(message); + } +} diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java index 91798281d..67cbc6d01 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.kafka.source.util.ExtractPayloadSourceRecordUtil; import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.flink.table.data.GenericRowData; @@ -56,6 +57,9 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema keyDeserialization, @@ -65,7 +69,9 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema producedTypeInfo, - boolean upsertMode) { + boolean upsertMode, + boolean keyIncludeKafkaConnectJsonSchema, + boolean valueIncludeKafkaConnectJsonSchema) { if (upsertMode) { Preconditions.checkArgument( keyDeserialization != null && keyProjection.length > 0, @@ -84,6 +90,8 @@ class DynamicKafkaDeserializationSchema implements KafkaDeserializationSchema record, Collector record, Collector RECORD_KEY_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA = + ConfigOptions.key("record.key.include.kafka.connect.json.schema") + .booleanType() + .defaultValue(false) + .withDescription( + "The record key from Kafka contains the JSON schema of Kafka connect. If it defaults to false, it does not need to be parsed. If it is configured to true, it needs to be parsed."); + + public static final ConfigOption RECORD_VALUE_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA = + ConfigOptions.key("record.value.include.kafka.connect.json.schema") + .booleanType() + .defaultValue(false) + .withDescription( + "The record value from Kafka contains the JSON schema of Kafka connect. If it defaults to false, it does not need to be parsed. If it is configured to true, it needs to be parsed."); + // -------------------------------------------------------------------------------------------- // Sink specific options // -------------------------------------------------------------------------------------------- diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java index c963da762..ba74d59a0 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSource.java @@ -171,6 +171,9 @@ public class KafkaDynamicSource protected final String tableIdentifier; + protected final boolean keyIncludeKafkaConnectJsonSchema; + protected final boolean valueIncludeKafkaConnectJsonSchema; + public KafkaDynamicSource( DataType physicalDataType, @Nullable DecodingFormat> keyDecodingFormat, @@ -189,6 +192,48 @@ public KafkaDynamicSource( long boundedTimestampMillis, boolean upsertMode, String tableIdentifier) { + this( + physicalDataType, + keyDecodingFormat, + valueDecodingFormat, + keyProjection, + valueProjection, + keyPrefix, + topics, + topicPattern, + properties, + startupMode, + specificStartupOffsets, + startupTimestampMillis, + boundedMode, + specificBoundedOffsets, + boundedTimestampMillis, + upsertMode, + tableIdentifier, + false, + false); + } + + public KafkaDynamicSource( + DataType physicalDataType, + @Nullable DecodingFormat> keyDecodingFormat, + DecodingFormat> valueDecodingFormat, + int[] keyProjection, + int[] valueProjection, + @Nullable String keyPrefix, + @Nullable List topics, + @Nullable Pattern topicPattern, + Properties properties, + StartupMode startupMode, + Map specificStartupOffsets, + long startupTimestampMillis, + BoundedMode boundedMode, + Map specificBoundedOffsets, + long boundedTimestampMillis, + boolean upsertMode, + String tableIdentifier, + boolean keyIncludeKafkaConnectJsonSchema, + boolean valueIncludeKafkaConnectJsonSchema) { // Format attributes this.physicalDataType = Preconditions.checkNotNull( @@ -228,6 +273,8 @@ public KafkaDynamicSource( this.boundedTimestampMillis = boundedTimestampMillis; this.upsertMode = upsertMode; this.tableIdentifier = tableIdentifier; + this.keyIncludeKafkaConnectJsonSchema = keyIncludeKafkaConnectJsonSchema; + this.valueIncludeKafkaConnectJsonSchema = valueIncludeKafkaConnectJsonSchema; } @Override @@ -344,7 +391,9 @@ public DynamicTableSource copy() { specificBoundedOffsets, boundedTimestampMillis, upsertMode, - tableIdentifier); + tableIdentifier, + keyIncludeKafkaConnectJsonSchema, + valueIncludeKafkaConnectJsonSchema); copy.producedDataType = producedDataType; copy.metadataKeys = metadataKeys; copy.watermarkStrategy = watermarkStrategy; @@ -384,7 +433,9 @@ public boolean equals(Object o) { && boundedTimestampMillis == that.boundedTimestampMillis && Objects.equals(upsertMode, that.upsertMode) && Objects.equals(tableIdentifier, that.tableIdentifier) - && Objects.equals(watermarkStrategy, that.watermarkStrategy); + && Objects.equals(watermarkStrategy, that.watermarkStrategy) + && keyIncludeKafkaConnectJsonSchema == that.keyIncludeKafkaConnectJsonSchema + && valueIncludeKafkaConnectJsonSchema == that.valueIncludeKafkaConnectJsonSchema; } @Override @@ -550,7 +601,9 @@ private KafkaDeserializationSchema createKafkaDeserializationSchema( hasMetadata, metadataConverters, producedTypeInfo, - upsertMode); + upsertMode, + keyIncludeKafkaConnectJsonSchema, + valueIncludeKafkaConnectJsonSchema); } private @Nullable DeserializationSchema createDeserialization( diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java index 89dda61a1..9a1570e5f 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java @@ -71,6 +71,8 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.KEY_FORMAT; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_GROUP_ID; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.RECORD_KEY_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.RECORD_VALUE_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_MODE; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_SPECIFIC_OFFSETS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_BOUNDED_TIMESTAMP_MILLIS; @@ -152,6 +154,8 @@ public Set> optionalOptions() { options.add(SCAN_BOUNDED_MODE); options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS); options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS); + options.add(RECORD_KEY_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA); + options.add(RECORD_VALUE_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA); return options; } @@ -215,6 +219,16 @@ public DynamicTableSource createDynamicTableSource(Context context) { final String keyPrefix = tableOptions.getOptional(KEY_FIELDS_PREFIX).orElse(null); + final boolean keyIncludeKafkaConnectJsonSchema = + tableOptions + .getOptional(RECORD_KEY_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA) + .orElse(false); + + final boolean valueIncludeKafkaConnectJsonSchema = + tableOptions + .getOptional(RECORD_VALUE_INCLUDE_KAFKA_CONNECT_JSON_SCHEMA) + .orElse(false); + return createKafkaTableSource( physicalDataType, keyDecodingFormat.orElse(null), @@ -231,7 +245,9 @@ public DynamicTableSource createDynamicTableSource(Context context) { boundedOptions.boundedMode, boundedOptions.specificOffsets, boundedOptions.boundedTimestampMillis, - context.getObjectIdentifier().asSummaryString()); + context.getObjectIdentifier().asSummaryString(), + keyIncludeKafkaConnectJsonSchema, + valueIncludeKafkaConnectJsonSchema); } @Override @@ -395,7 +411,9 @@ protected KafkaDynamicSource createKafkaTableSource( BoundedMode boundedMode, Map specificEndOffsets, long endTimestampMillis, - String tableIdentifier) { + String tableIdentifier, + boolean keyIncludeKafkaConnectJsonSchema, + boolean valueIncludeKafkaConnectJsonSchema) { return new KafkaDynamicSource( physicalDataType, keyDecodingFormat, @@ -413,7 +431,9 @@ protected KafkaDynamicSource createKafkaTableSource( specificEndOffsets, endTimestampMillis, false, - tableIdentifier); + tableIdentifier, + keyIncludeKafkaConnectJsonSchema, + valueIncludeKafkaConnectJsonSchema); } protected KafkaDynamicSink createKafkaTableSink( diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java index b0ca63161..0951dd26c 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchemaTest.java @@ -22,6 +22,7 @@ import org.apache.flink.connector.testutils.formats.DummyInitializationContext; import org.apache.flink.connector.testutils.source.deserialization.TestingDeserializationContext; import org.apache.flink.formats.json.JsonDeserializationSchema; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; import org.apache.flink.util.Collector; @@ -32,6 +33,10 @@ import org.apache.kafka.common.Configurable; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.json.JsonConverter; import org.junit.Before; import org.junit.Test; @@ -104,6 +109,25 @@ public void testKafkaValueDeserializationSchemaWrapper() throws Exception { assertThat(deserializedValue.get("metadata")).isNull(); } + @Test + public void testKafkaValueDeserializationSchemaWrapperWithKafkaConnectSchema() + throws Exception { + final ConsumerRecord consumerRecord = getConsumerRecordFromSourceRecord(); + KafkaRecordDeserializationSchema schema = + KafkaRecordDeserializationSchema.valueOnly( + new JsonDeserializationSchema<>(JsonNode.class), true); + schema.open(new DummyInitializationContext()); + SimpleCollector collector = new SimpleCollector<>(); + schema.deserialize(consumerRecord, collector); + + assertThat(collector.list).hasSize(1); + JsonNode deserializedValue = collector.list.get(0); + + assertThat(deserializedValue.get("word").asText()).isEqualTo("world"); + assertThat(deserializedValue.get("key")).isNull(); + assertThat(deserializedValue.get("metadata")).isNull(); + } + @Test public void testKafkaValueDeserializerWrapper() throws Exception { final String topic = "Topic"; @@ -144,6 +168,26 @@ public void testKafkaValueDeserializerWrapperWithConfigurable() throws Exception assertThat(configuration).isEmpty(); } + private ConsumerRecord getConsumerRecordFromSourceRecord() { + Schema schema = + SchemaBuilder.struct() + .field("index", Schema.INT32_SCHEMA) + .field("word", Schema.STRING_SCHEMA) + .build(); + Struct value = new Struct(schema); + value.put("index", 4); + value.put("word", "world"); + + JsonConverter converter = new JsonConverter(); + Map config = new HashMap<>(); + config.put("converter.type", "value"); + config.put("schemas.enable", true); + converter.configure(config); + + byte[] serializedValue = converter.fromConnectData("topic#2", schema, value); + return new ConsumerRecord<>("topic#2", 3, 4L, null, serializedValue); + } + private ConsumerRecord getConsumerRecord() throws JsonProcessingException { ObjectNode initialKey = OBJECT_MAPPER.createObjectNode(); initialKey.put("index", 4); diff --git a/pom.xml b/pom.xml index 25d3c1012..2ee30d954 100644 --- a/pom.xml +++ b/pom.xml @@ -151,7 +151,6 @@ under the License. - org.apache.logging.log4j log4j-slf4j-impl @@ -193,7 +192,6 @@ under the License. - @@ -232,6 +230,14 @@ under the License. ${kafka.version} + + + org.apache.kafka + connect-json + ${kafka.version} + test + + org.apache.zookeeper zookeeper @@ -434,7 +440,6 @@ under the License. pom import -