diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java index 43d16c04c27f..1a9c07ccb2b1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java @@ -35,6 +35,7 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.SerializerProvider; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.module.SimpleModule; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; @@ -130,6 +131,13 @@ public static T fromJson(String json, TypeReference typeReference) { } } + public static ObjectNode setNode(ObjectNode node, String fieldName, T value) { + ObjectMapper mapper = new ObjectMapper(); + JsonNode nodeValue = OBJECT_MAPPER_INSTANCE.valueToTree(value); + node.set(fieldName, nodeValue); + return node; + } + public static T fromJson(String json, Class clazz) { try { return OBJECT_MAPPER_INSTANCE.reader().readValue(json, clazz); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java index c517670e40cf..109cf971af4a 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java @@ -87,7 +87,6 @@ private static void sleepSafely(int duration) { /** Wrap the consumer for different message queues. */ public interface ConsumerWrapper extends AutoCloseable { - List getRecords(int pollTimeOutMills); String topic(); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java index ea9fa5492b7a..790b8a5c53a5 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java @@ -76,6 +76,8 @@ public abstract class RecordParser protected JsonNode root; + protected JsonNode key; + public RecordParser(TypeMapping typeMapping, List computedColumns) { this.typeMapping = typeMapping; this.computedColumns = computedColumns; @@ -85,6 +87,7 @@ public RecordParser(TypeMapping typeMapping, List computedColumn public Schema buildSchema(CdcSourceRecord record) { try { setRoot(record); + setKey(record); if (isDDL()) { return null; } @@ -175,7 +178,7 @@ protected void evalComputedColumns( }); } - private List extractPrimaryKeys() { + protected List extractPrimaryKeys() { ArrayNode pkNames = getNodeAs(root, primaryField(), ArrayNode.class); if (pkNames == null) { return Collections.emptyList(); @@ -204,6 +207,10 @@ private RichCdcMultiplexRecord createRecord( new CdcRecord(rowKind, data)); } + protected void setKey(CdcSourceRecord record) { + key = (JsonNode) record.getKey(); + } + protected void setRoot(CdcSourceRecord record) { root = (JsonNode) record.getValue(); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java index c2b6587547e4..2d37eefe7ee5 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java @@ -28,6 +28,7 @@ import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; @@ -135,6 +136,22 @@ protected void setRoot(CdcSourceRecord record) { } } + @Override + protected void setKey(CdcSourceRecord record) { + JsonNode node = (JsonNode) record.getKey(); + hasSchema = false; + if (node.has(FIELD_SCHEMA)) { + key = node.get(FIELD_PAYLOAD); + JsonNode schema = node.get(FIELD_SCHEMA); + if (!isNull(schema)) { + parseSchema(schema); + hasSchema = true; + } + } else { + key = node; + } + } + private void parseSchema(JsonNode schema) { debeziumTypes.clear(); classNames.clear(); @@ -217,6 +234,16 @@ protected Map extractRowData(JsonNode record, RowType.Builder ro return resultMap; } + @Override + protected List extractPrimaryKeys() { + if (key != null) { + List primaryKeys = Lists.newArrayList(); + key.fieldNames().forEachRemaining(primaryKeys::add); + return primaryKeys; + } + return super.extractPrimaryKeys(); + } + @Override protected String primaryField() { return FIELD_PRIMARY; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java index 4f0be0ef221e..c425cb6544b1 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java @@ -87,7 +87,6 @@ public static KafkaSource buildKafkaSource(Configuration kafkaC kafkaSourceBuilder .setValueOnlyDeserializer(new CdcJsonDeserializationSchema()) .setGroupId(kafkaPropertiesGroupId(kafkaConfig)); - Properties properties = createKafkaProperties(kafkaConfig); StartupMode startupMode = diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaValueOnlyDeserializationSchemaWrapper.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaValueOnlyDeserializationSchemaWrapper.java deleted file mode 100644 index 5e6b96670bdb..000000000000 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaValueOnlyDeserializationSchemaWrapper.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.action.cdc.kafka; - -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; -import org.apache.flink.util.Collector; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * A class that wraps a {@link DeserializationSchema} as the value deserializer for a {@link - * ConsumerRecord}. - * - * @param the return type of the deserialization. - */ -class KafkaValueOnlyDeserializationSchemaWrapper implements KafkaRecordDeserializationSchema { - private static final long serialVersionUID = 1L; - private final DeserializationSchema deserializationSchema; - private static final Logger LOG = - LoggerFactory.getLogger(KafkaValueOnlyDeserializationSchemaWrapper.class); - - KafkaValueOnlyDeserializationSchemaWrapper(DeserializationSchema deserializationSchema) { - this.deserializationSchema = deserializationSchema; - } - - @Override - public void open(DeserializationSchema.InitializationContext context) throws Exception { - deserializationSchema.open(context); - } - - @Override - public void deserialize(ConsumerRecord message, Collector out) - throws IOException { - if (message.value() != null) { - deserializationSchema.deserialize(message.value(), out); - } else { - // see - // https://debezium.io/documentation/reference/2.5/connectors/mysql.html#mysql-tombstone-events - LOG.info( - "Found null message value:\n{}\nThis message will be ignored. It might be produced by tombstone-event, " - + "please check your Debezium and Kafka configuration.", - message); - } - } - - @Override - public TypeInformation getProducedType() { - return deserializationSchema.getProducedType(); - } -} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java index 7d8c78c2eddf..72225e7df2a8 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java @@ -328,6 +328,18 @@ private boolean isRecordLine(String line) { } } + private void send(String topic, String key, String record, boolean wait) { + Future sendFuture = + kafkaProducer.send(new ProducerRecord<>(topic, key, record)); + if (wait) { + try { + sendFuture.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + } + private void send(String topic, String record, boolean wait) { Future sendFuture = kafkaProducer.send(new ProducerRecord<>(topic, record)); if (wait) { @@ -339,6 +351,20 @@ private void send(String topic, String record, boolean wait) { } } + void writeRecordsToKafka(String topic, boolean wait, String resourceDirFormat) + throws Exception { + URL url = + KafkaCanalSyncTableActionITCase.class + .getClassLoader() + .getResource(String.format(resourceDirFormat)); + List lines = Files.readAllLines(Paths.get(url.toURI())); + lines.stream() + .map(line -> line.split(";")) + .filter(keyValues -> (keyValues.length > 1)) + .filter(keyValues -> isRecordLine(keyValues[0]) && isRecordLine(keyValues[1])) + .forEach(keyValues -> this.send(topic, keyValues[0], keyValues[1], wait)); + } + /** Kafka container extension for junit5. */ private static class KafkaContainerExtension extends KafkaContainer implements BeforeAllCallback, AfterAllCallback { diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java index 04dfb3769a61..a4fbca868b9f 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java @@ -147,4 +147,16 @@ public void testMessageWithNullValue() throws Exception { rowType, Collections.singletonList("id")); } + + @Timeout(120) + @Test + public void testRecordAndAutoDiscoveryPrimaryKeys() throws Exception { + testRecordWithPrimaryKeys(DEBEZIUM); + } + + @Test + @Timeout(120) + public void testSchemaIncludeRecordAndAutoDiscoveryPrimaryKeys() throws Exception { + testSchemaIncludeRecordWithPrimaryKeys(DEBEZIUM); + } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java index 5f7df79e48ed..b6701366df4d 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java @@ -569,6 +569,78 @@ public void testSchemaIncludeRecord(String format) throws Exception { waitForResult(expected, table, rowType, primaryKeys); } + public void testRecordWithPrimaryKeys(String format) throws Exception { + String topic = "no_schema_include_with_primary_keys"; + createTestTopic(topic, 1, 1); + + try { + writeRecordsToKafka( + topic, false, "kafka/debezium/table/schema/primarykeys/debezium-data-1.txt"); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); + kafkaConfig.put(TOPIC.key(), topic); + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig).withTableConfig(getBasicTableConfig()).build(); + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(tableName); + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight"}); + List primaryKeys = Collections.singletonList("id"); + List expected = + Arrays.asList( + "+I[101, scooter, Small 2-wheel scooter, 3.14]", + "+I[102, car battery, 12V car battery, 8.1]"); + waitForResult(expected, table, rowType, primaryKeys); + } + + public void testSchemaIncludeRecordWithPrimaryKeys(String format) throws Exception { + String topic = "schema_include_with_primary_keys"; + createTestTopic(topic, 1, 1); + try { + writeRecordsToKafka( + topic, + false, + "kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt"); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put(VALUE_FORMAT.key(), format + "-json"); + kafkaConfig.put(TOPIC.key(), topic); + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig).withTableConfig(getBasicTableConfig()).build(); + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(tableName); + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.DOUBLE() + }, + new String[] {"id", "name", "description", "weight"}); + List primaryKeys = Collections.singletonList("id"); + List expected = + Collections.singletonList( + "+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]"); + waitForResult(expected, table, rowType, primaryKeys); + } + // TODO some types are different from mysql cdc; maybe need to fix public void testAllTypesWithSchemaImpl(String format) throws Exception { String topic = "schema_include_all_type"; diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-1.txt new file mode 100644 index 000000000000..9bb00c7786d8 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-1.txt @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"schema": null ,"payload": {"id": 101}};{"schema":null, "payload":{"before": null, "after": {"id": 101, "name": "scooter", "description": "Small 2-wheel scooter", "weight": 3.14}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}} +{"schema": null ,"payload": {"id": 102}};{"schema":null, "payload":{"before": null, "after": {"id": 102, "name": "car battery", "description": "12V car battery", "weight": 8.1}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt new file mode 100644 index 000000000000..1feaafe9fc20 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt @@ -0,0 +1,18 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +{"schema": null ,"payload": {"id": 101}};{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}}