Skip to content

Commit

Permalink
Kafka debezium json supports automatic discovery of primary keys
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jun 25, 2024
1 parent 3baca1c commit 7fac28d
Show file tree
Hide file tree
Showing 11 changed files with 190 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

Expand Down Expand Up @@ -130,6 +131,13 @@ public static <T> T fromJson(String json, TypeReference<T> typeReference) {
}
}

public static <T> ObjectNode setNode(ObjectNode node, String fieldName, T value) {
ObjectMapper mapper = new ObjectMapper();
JsonNode nodeValue = OBJECT_MAPPER_INSTANCE.valueToTree(value);
node.set(fieldName, nodeValue);
return node;
}

public static <T> T fromJson(String json, Class<T> clazz) {
try {
return OBJECT_MAPPER_INSTANCE.reader().readValue(json, clazz);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ private static void sleepSafely(int duration) {

/** Wrap the consumer for different message queues. */
public interface ConsumerWrapper extends AutoCloseable {

List<CdcSourceRecord> getRecords(int pollTimeOutMills);

String topic();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public abstract class RecordParser

protected JsonNode root;

protected JsonNode key;

public RecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
this.typeMapping = typeMapping;
this.computedColumns = computedColumns;
Expand All @@ -85,6 +87,7 @@ public RecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumn
public Schema buildSchema(CdcSourceRecord record) {
try {
setRoot(record);
setKey(record);
if (isDDL()) {
return null;
}
Expand Down Expand Up @@ -175,7 +178,7 @@ protected void evalComputedColumns(
});
}

private List<String> extractPrimaryKeys() {
protected List<String> extractPrimaryKeys() {
ArrayNode pkNames = getNodeAs(root, primaryField(), ArrayNode.class);
if (pkNames == null) {
return Collections.emptyList();
Expand Down Expand Up @@ -204,6 +207,10 @@ private RichCdcMultiplexRecord createRecord(
new CdcRecord(rowKind, data));
}

protected void setKey(CdcSourceRecord record) {
key = (JsonNode) record.getKey();
}

protected void setRoot(CdcSourceRecord record) {
root = (JsonNode) record.getValue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Preconditions;

import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
Expand Down Expand Up @@ -135,6 +136,22 @@ protected void setRoot(CdcSourceRecord record) {
}
}

@Override
protected void setKey(CdcSourceRecord record) {
JsonNode node = (JsonNode) record.getKey();
hasSchema = false;
if (node.has(FIELD_SCHEMA)) {
key = node.get(FIELD_PAYLOAD);
JsonNode schema = node.get(FIELD_SCHEMA);
if (!isNull(schema)) {
parseSchema(schema);
hasSchema = true;
}
} else {
key = node;
}
}

private void parseSchema(JsonNode schema) {
debeziumTypes.clear();
classNames.clear();
Expand Down Expand Up @@ -217,6 +234,16 @@ protected Map<String, String> extractRowData(JsonNode record, RowType.Builder ro
return resultMap;
}

@Override
protected List<String> extractPrimaryKeys() {
if (key != null) {
List<String> primaryKeys = Lists.newArrayList();
key.fieldNames().forEachRemaining(primaryKeys::add);
return primaryKeys;
}
return super.extractPrimaryKeys();
}

@Override
protected String primaryField() {
return FIELD_PRIMARY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ public static KafkaSource<CdcSourceRecord> buildKafkaSource(Configuration kafkaC
kafkaSourceBuilder
.setValueOnlyDeserializer(new CdcJsonDeserializationSchema())
.setGroupId(kafkaPropertiesGroupId(kafkaConfig));

Properties properties = createKafkaProperties(kafkaConfig);

StartupMode startupMode =
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,18 @@ private boolean isRecordLine(String line) {
}
}

private void send(String topic, String key, String record, boolean wait) {
Future<RecordMetadata> sendFuture =
kafkaProducer.send(new ProducerRecord<>(topic, key, record));
if (wait) {
try {
sendFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}

private void send(String topic, String record, boolean wait) {
Future<RecordMetadata> sendFuture = kafkaProducer.send(new ProducerRecord<>(topic, record));
if (wait) {
Expand All @@ -339,6 +351,20 @@ private void send(String topic, String record, boolean wait) {
}
}

void writeRecordsToKafka(String topic, boolean wait, String resourceDirFormat)
throws Exception {
URL url =
KafkaCanalSyncTableActionITCase.class
.getClassLoader()
.getResource(String.format(resourceDirFormat));
List<String> lines = Files.readAllLines(Paths.get(url.toURI()));
lines.stream()
.map(line -> line.split(";"))
.filter(keyValues -> (keyValues.length > 1))
.filter(keyValues -> isRecordLine(keyValues[0]) && isRecordLine(keyValues[1]))
.forEach(keyValues -> this.send(topic, keyValues[0], keyValues[1], wait));
}

/** Kafka container extension for junit5. */
private static class KafkaContainerExtension extends KafkaContainer
implements BeforeAllCallback, AfterAllCallback {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,4 +147,16 @@ public void testMessageWithNullValue() throws Exception {
rowType,
Collections.singletonList("id"));
}

@Timeout(120)
@Test
public void testRecordAndAutoDiscoveryPrimaryKeys() throws Exception {
testRecordWithPrimaryKeys(DEBEZIUM);
}

@Test
@Timeout(120)
public void testSchemaIncludeRecordAndAutoDiscoveryPrimaryKeys() throws Exception {
testSchemaIncludeRecordWithPrimaryKeys(DEBEZIUM);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,78 @@ public void testSchemaIncludeRecord(String format) throws Exception {
waitForResult(expected, table, rowType, primaryKeys);
}

public void testRecordWithPrimaryKeys(String format) throws Exception {
String topic = "no_schema_include_with_primary_keys";
createTestTopic(topic, 1, 1);

try {
writeRecordsToKafka(
topic, false, "kafka/debezium/table/schema/primarykeys/debezium-data-1.txt");
} catch (Exception e) {
throw new Exception("Failed to write debezium data to Kafka.", e);
}
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig).withTableConfig(getBasicTableConfig()).build();
runActionWithDefaultEnv(action);

FileStoreTable table = getFileStoreTable(tableName);

RowType rowType =
RowType.of(
new DataType[] {
DataTypes.STRING().notNull(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING()
},
new String[] {"id", "name", "description", "weight"});
List<String> primaryKeys = Collections.singletonList("id");
List<String> expected =
Arrays.asList(
"+I[101, scooter, Small 2-wheel scooter, 3.14]",
"+I[102, car battery, 12V car battery, 8.1]");
waitForResult(expected, table, rowType, primaryKeys);
}

public void testSchemaIncludeRecordWithPrimaryKeys(String format) throws Exception {
String topic = "schema_include_with_primary_keys";
createTestTopic(topic, 1, 1);
try {
writeRecordsToKafka(
topic,
false,
"kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt");
} catch (Exception e) {
throw new Exception("Failed to write debezium data to Kafka.", e);
}
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
kafkaConfig.put(TOPIC.key(), topic);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig).withTableConfig(getBasicTableConfig()).build();
runActionWithDefaultEnv(action);

FileStoreTable table = getFileStoreTable(tableName);

RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT().notNull(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.DOUBLE()
},
new String[] {"id", "name", "description", "weight"});
List<String> primaryKeys = Collections.singletonList("id");
List<String> expected =
Collections.singletonList(
"+I[101, scooter, Small 2-wheel scooter, 3.140000104904175]");
waitForResult(expected, table, rowType, primaryKeys);
}

// TODO some types are different from mysql cdc; maybe need to fix
public void testAllTypesWithSchemaImpl(String format) throws Exception {
String topic = "schema_include_all_type";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{"schema": null ,"payload": {"id": 101}};{"schema":null, "payload":{"before": null, "after": {"id": 101, "name": "scooter", "description": "Small 2-wheel scooter", "weight": 3.14}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}}
{"schema": null ,"payload": {"id": 102}};{"schema":null, "payload":{"before": null, "after": {"id": 102, "name": "car battery", "description": "12V car battery", "weight": 8.1}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
{"schema": null ,"payload": {"id": 101}};{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"},{"type":"string","optional":true,"field":"description"},{"type":"double","optional":true,"field":"weight"}],"optional":true,"name":"dbserver1.inventory.products.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":"total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false,"name":"dbserver1.inventory.products.Envelope"},"payload":{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}}

0 comments on commit 7fac28d

Please sign in to comment.