Skip to content

Commit 64e3ddc

Browse files
committed
Kafka debezium json supports automatic discovery of primary keys
1 parent 3baca1c commit 64e3ddc

File tree

8 files changed

+179
-72
lines changed

8 files changed

+179
-72
lines changed

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ public abstract class RecordParser
7676

7777
protected JsonNode root;
7878

79+
protected JsonNode key;
80+
7981
public RecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
8082
this.typeMapping = typeMapping;
8183
this.computedColumns = computedColumns;
@@ -85,6 +87,7 @@ public RecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumn
8587
public Schema buildSchema(CdcSourceRecord record) {
8688
try {
8789
setRoot(record);
90+
setKey(record);
8891
if (isDDL()) {
8992
return null;
9093
}
@@ -175,7 +178,7 @@ protected void evalComputedColumns(
175178
});
176179
}
177180

178-
private List<String> extractPrimaryKeys() {
181+
protected List<String> extractPrimaryKeys() {
179182
ArrayNode pkNames = getNodeAs(root, primaryField(), ArrayNode.class);
180183
if (pkNames == null) {
181184
return Collections.emptyList();
@@ -204,6 +207,10 @@ private RichCdcMultiplexRecord createRecord(
204207
new CdcRecord(rowKind, data));
205208
}
206209

210+
protected void setKey(CdcSourceRecord record) {
211+
key = (JsonNode) record.getKey();
212+
}
213+
207214
protected void setRoot(CdcSourceRecord record) {
208215
root = (JsonNode) record.getValue();
209216
}

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.paimon.utils.JsonSerdeUtil;
2929
import org.apache.paimon.utils.Preconditions;
3030

31+
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
3132
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
3233
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
3334
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
@@ -135,6 +136,19 @@ protected void setRoot(CdcSourceRecord record) {
135136
}
136137
}
137138

139+
@Override
140+
protected void setKey(CdcSourceRecord record) {
141+
JsonNode node = (JsonNode) record.getKey();
142+
if (node == null) {
143+
return;
144+
}
145+
if (node.has(FIELD_SCHEMA)) {
146+
key = node.get(FIELD_PAYLOAD);
147+
} else {
148+
key = node;
149+
}
150+
}
151+
138152
private void parseSchema(JsonNode schema) {
139153
debeziumTypes.clear();
140154
classNames.clear();
@@ -217,6 +231,16 @@ protected Map<String, String> extractRowData(JsonNode record, RowType.Builder ro
217231
return resultMap;
218232
}
219233

234+
@Override
235+
protected List<String> extractPrimaryKeys() {
236+
if (key != null) {
237+
List<String> primaryKeys = Lists.newArrayList();
238+
key.fieldNames().forEachRemaining(primaryKeys::add);
239+
return primaryKeys;
240+
}
241+
return super.extractPrimaryKeys();
242+
}
243+
220244
@Override
221245
protected String primaryField() {
222246
return FIELD_PRIMARY;

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaValueOnlyDeserializationSchemaWrapper.java

Lines changed: 0 additions & 71 deletions
This file was deleted.

paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,18 @@ private boolean isRecordLine(String line) {
328328
}
329329
}
330330

331+
private void send(String topic, String key, String record, boolean wait) {
332+
Future<RecordMetadata> sendFuture =
333+
kafkaProducer.send(new ProducerRecord<>(topic, key, record));
334+
if (wait) {
335+
try {
336+
sendFuture.get();
337+
} catch (InterruptedException | ExecutionException e) {
338+
throw new RuntimeException(e);
339+
}
340+
}
341+
}
342+
331343
private void send(String topic, String record, boolean wait) {
332344
Future<RecordMetadata> sendFuture = kafkaProducer.send(new ProducerRecord<>(topic, record));
333345
if (wait) {
@@ -339,6 +351,20 @@ private void send(String topic, String record, boolean wait) {
339351
}
340352
}
341353

354+
void writeRecordsToKafka(String topic, boolean wait, String resourceDirFormat)
355+
throws Exception {
356+
URL url =
357+
KafkaCanalSyncTableActionITCase.class
358+
.getClassLoader()
359+
.getResource(String.format(resourceDirFormat));
360+
List<String> lines = Files.readAllLines(Paths.get(url.toURI()));
361+
lines.stream()
362+
.map(line -> line.split(";"))
363+
.filter(keyValues -> (keyValues.length > 1))
364+
.filter(keyValues -> isRecordLine(keyValues[0]) && isRecordLine(keyValues[1]))
365+
.forEach(keyValues -> this.send(topic, keyValues[0], keyValues[1], wait));
366+
}
367+
342368
/** Kafka container extension for junit5. */
343369
private static class KafkaContainerExtension extends KafkaContainer
344370
implements BeforeAllCallback, AfterAllCallback {

paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,4 +147,16 @@ public void testMessageWithNullValue() throws Exception {
147147
rowType,
148148
Collections.singletonList("id"));
149149
}
150+
151+
@Timeout(120)
152+
@Test
153+
public void testRecordAndAutoDiscoveryPrimaryKeys() throws Exception {
154+
testRecordWithPrimaryKeys(DEBEZIUM);
155+
}
156+
157+
@Test
158+
@Timeout(120)
159+
public void testSchemaIncludeRecordAndAutoDiscoveryPrimaryKeys() throws Exception {
160+
testSchemaIncludeRecordWithPrimaryKeys(DEBEZIUM);
161+
}
150162
}

paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,78 @@ public void testSchemaIncludeRecord(String format) throws Exception {
569569
waitForResult(expected, table, rowType, primaryKeys);
570570
}
571571

572+
public void testRecordWithPrimaryKeys(String format) throws Exception {
573+
String topic = "no_schema_include_with_primary_keys";
574+
createTestTopic(topic, 1, 1);
575+
576+
try {
577+
writeRecordsToKafka(
578+
topic, false, "kafka/debezium/table/schema/primarykeys/debezium-data-1.txt");
579+
} catch (Exception e) {
580+
throw new Exception("Failed to write debezium data to Kafka.", e);
581+
}
582+
Map<String, String> kafkaConfig = getBasicKafkaConfig();
583+
kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
584+
kafkaConfig.put(TOPIC.key(), topic);
585+
KafkaSyncTableAction action =
586+
syncTableActionBuilder(kafkaConfig).withTableConfig(getBasicTableConfig()).build();
587+
runActionWithDefaultEnv(action);
588+
589+
FileStoreTable table = getFileStoreTable(tableName);
590+
591+
RowType rowType =
592+
RowType.of(
593+
new DataType[] {
594+
DataTypes.STRING().notNull(),
595+
DataTypes.STRING(),
596+
DataTypes.STRING(),
597+
DataTypes.STRING()
598+
},
599+
new String[] {"id", "name", "description", "weight"});
600+
List<String> primaryKeys = Collections.singletonList("id");
601+
List<String> expected =
602+
Arrays.asList(
603+
"+I[101, scooter, Small 2-wheel scooter, 3.14]",
604+
"+I[102, car battery, 12V car battery, 8.1]");
605+
waitForResult(expected, table, rowType, primaryKeys);
606+
}
607+
608+
public void testSchemaIncludeRecordWithPrimaryKeys(String format) throws Exception {
609+
String topic = "schema_include_with_primary_keys";
610+
createTestTopic(topic, 1, 1);
611+
try {
612+
writeRecordsToKafka(
613+
topic,
614+
false,
615+
"kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt");
616+
} catch (Exception e) {
617+
throw new Exception("Failed to write debezium data to Kafka.", e);
618+
}
619+
Map<String, String> kafkaConfig = getBasicKafkaConfig();
620+
kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
621+
kafkaConfig.put(TOPIC.key(), topic);
622+
KafkaSyncTableAction action =
623+
syncTableActionBuilder(kafkaConfig).withTableConfig(getBasicTableConfig()).build();
624+
runActionWithDefaultEnv(action);
625+
626+
FileStoreTable table = getFileStoreTable(tableName);
627+
628+
RowType rowType =
629+
RowType.of(
630+
new DataType[] {
631+
DataTypes.INT().notNull(),
632+
DataTypes.STRING(),
633+
DataTypes.STRING(),
634+
DataTypes.DOUBLE()
635+
},
636+
new String[] {"id", "name", "description", "weight"});
637+
List<String> primaryKeys = Collections.singletonList("id");
638+
List<String> expected =
639+
Collections.singletonList(
640+
"+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]");
641+
waitForResult(expected, table, rowType, primaryKeys);
642+
}
643+
572644
// TODO some types are different from mysql cdc; maybe need to fix
573645
public void testAllTypesWithSchemaImpl(String format) throws Exception {
574646
String topic = "schema_include_all_type";
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
{"schema": null ,"payload": {"id": 101}};{"schema":null, "payload":{"before": null, "after": {"id": 101, "name": "scooter", "description": "Small 2-wheel scooter", "weight": 3.14}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}}
19+
{"schema": null ,"payload": {"id": 102}};{"schema":null, "payload":{"before": null, "after": {"id": 102, "name": "car battery", "description": "12V car battery", "weight": 8.1}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
{"schema": null ,"payload": {"id": 101}};{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}}

0 commit comments

Comments
 (0)