From 18866bb5d867fa908a7f382a95f41513af7714a0 Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Wed, 1 Nov 2023 11:22:41 +0800 Subject: [PATCH] Add debezium-json format #2227 --- .../action/cdc/MessageQueueSchemaUtils.java | 3 +- .../cdc/MessageQueueSyncTableActionBase.java | 5 +- .../flink/action/cdc/format/DataFormat.java | 4 +- .../flink/action/cdc/format/RecordParser.java | 22 +- .../debezium/DebeziumJsonRecordParser.java | 218 ++++++++ .../format/debezium/DebeziumRecordParser.java | 143 ----- .../action/cdc/kafka/KafkaActionUtils.java | 55 +- .../KafkaKeyValueDeserializationSchema.java | 59 ++ .../cdc/kafka/KafkaSyncDatabaseAction.java | 3 +- .../cdc/kafka/KafkaSyncTableAction.java | 7 +- .../action/cdc/pulsar/PulsarActionUtils.java | 12 +- .../PulsarKeyValueDeserializationSchema.java | 55 ++ .../cdc/pulsar/PulsarSyncTableAction.java | 3 +- .../flink/kafka/KafkaLogStoreFactory.java | 5 +- .../flink/kafka/KafkaLogStoreRegister.java | 3 +- ...KafkaDebeziumSyncDatabaseActionITCase.java | 510 ++++++++++++++++++ .../KafkaDebeziumSyncTableActionITCase.java | 368 ++++++++++++- .../include/topic0/debezium-data-1.txt | 22 + .../prefixsuffix/topic0/debezium-data-1.txt | 20 + .../prefixsuffix/topic0/debezium-data-2.txt | 20 + .../prefixsuffix/topic1/debezium-data-1.txt | 20 + .../prefixsuffix/topic1/debezium-data-2.txt | 20 + .../topic0/debezium-data-1.txt | 20 + .../topic0/debezium-data-2.txt | 20 + .../topic1/debezium-data-1.txt | 20 + .../topic1/debezium-data-2.txt | 20 + .../table/computedcolumn/debezium-data-1.txt | 2 +- .../table/schemaevolution/debezium-data-1.txt | 4 +- .../table/schemaevolution/debezium-data-2.txt | 4 +- .../table/schemaevolution/debezium-data-3.txt | 8 +- .../table/startupmode/debezium-data-1.txt | 20 + .../table/startupmode/debezium-data-2.txt | 20 + 32 files changed, 1491 insertions(+), 224 deletions(-) create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java delete mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaKeyValueDeserializationSchema.java create mode 100644 paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarKeyValueDeserializationSchema.java create mode 100644 paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncDatabaseActionITCase.java create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/include/topic0/debezium-data-1.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/prefixsuffix/topic0/debezium-data-1.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/prefixsuffix/topic0/debezium-data-2.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/prefixsuffix/topic1/debezium-data-1.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/prefixsuffix/topic1/debezium-data-2.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/schemaevolution/topic0/debezium-data-1.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/schemaevolution/topic0/debezium-data-2.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/schemaevolution/topic1/debezium-data-1.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/schemaevolution/topic1/debezium-data-2.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/startupmode/debezium-data-1.txt create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/startupmode/debezium-data-2.txt diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java index f8273a93ad907..45d54dd0338d4 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSchemaUtils.java @@ -21,6 +21,7 @@ import org.apache.paimon.flink.action.cdc.format.DataFormat; import org.apache.paimon.flink.action.cdc.format.RecordParser; import org.apache.paimon.schema.Schema; +import org.apache.paimon.utils.Pair; import java.util.Collections; import java.util.List; @@ -92,6 +93,6 @@ public SchemaRetrievalException(String message) { /** Wrap the consumer for different message queues. */ public interface ConsumerWrapper extends AutoCloseable { - List getRecords(String topic, int pollTimeOutMills); + List> getRecords(String topic, int pollTimeOutMills); } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java index 970f373244314..2d68019826d06 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/MessageQueueSyncTableActionBase.java @@ -31,6 +31,7 @@ import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.Pair; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.connector.source.Source; @@ -137,7 +138,7 @@ public MessageQueueSyncTableActionBase withTypeMapping(TypeMapping typeMapping) return this; } - protected abstract Source buildSource(); + protected abstract Source buildSource(); protected abstract String topic(); @@ -149,7 +150,7 @@ public MessageQueueSyncTableActionBase withTypeMapping(TypeMapping typeMapping) @Override public void build() throws Exception { - Source source = buildSource(); + Source source = buildSource(); catalog.createDatabase(database, true); boolean caseSensitive = catalog.caseSensitive(); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java index 28dc3e457c093..e1421e98ef1f4 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/DataFormat.java @@ -21,7 +21,7 @@ import org.apache.paimon.flink.action.cdc.ComputedColumn; import org.apache.paimon.flink.action.cdc.TypeMapping; import org.apache.paimon.flink.action.cdc.format.canal.CanalRecordParser; -import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumRecordParser; +import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumJsonRecordParser; import org.apache.paimon.flink.action.cdc.format.maxwell.MaxwellRecordParser; import org.apache.paimon.flink.action.cdc.format.ogg.OggRecordParser; @@ -38,7 +38,7 @@ public enum DataFormat { CANAL_JSON(CanalRecordParser::new), OGG_JSON(OggRecordParser::new), MAXWELL_JSON(MaxwellRecordParser::new), - DEBEZIUM_JSON(DebeziumRecordParser::new); + DEBEZIUM_JSON(DebeziumJsonRecordParser::new); // Add more data formats here if needed private final RecordParserFactory parser; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java index b5465ac5e6aed..a0fa01ce4b5fc 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/RecordParser.java @@ -27,6 +27,7 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; import org.apache.paimon.utils.JsonSerdeUtil; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.StringUtils; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; @@ -68,10 +69,13 @@ * Subclasses are expected to provide specific implementations for extracting records, validating * message formats, and other format-specific operations. */ -public abstract class RecordParser implements FlatMapFunction { +public abstract class RecordParser implements FlatMapFunction { protected static final String FIELD_TABLE = "table"; protected static final String FIELD_DATABASE = "database"; + + protected static final String FIELD_RECORD_KEY = "key"; + protected static final String FIELD_RECORD_VALUE = "value"; protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); protected final boolean caseSensitive; protected final TypeMapping typeMapping; @@ -89,13 +93,18 @@ public RecordParser( this.computedColumns = computedColumns; } + @Nullable + public Schema buildSchema(Pair record) { + this.parseKeyIfNeed(record.getKey()); + return this.buildSchema(record.getValue()); + } + @Nullable public Schema buildSchema(String record) { this.parseRootJson(record); if (BooleanUtils.isTrue(this.isDDL())) { return null; } - tableName = extractStringFromRootJson(FIELD_TABLE); this.validateFormat(); this.extractPrimaryKeys(); @@ -155,10 +164,9 @@ protected LinkedHashMap setPaimonFieldType() { } @Override - public void flatMap(String value, Collector out) throws Exception { - root = OBJECT_MAPPER.readValue(value, JsonNode.class); + public void flatMap(Pair record, Collector out) throws Exception { + root = OBJECT_MAPPER.readValue((String) record.getValue(), JsonNode.class); this.validateFormat(); - databaseName = extractStringFromRootJson(FIELD_DATABASE); tableName = extractStringFromRootJson(FIELD_TABLE); @@ -231,7 +239,9 @@ protected RichCdcMultiplexRecord createRecord( new CdcRecord(rowKind, data)); } - protected final void parseRootJson(String record) { + protected void parseKeyIfNeed(String key) {} + + protected void parseRootJson(String record) { try { root = OBJECT_MAPPER.readValue(record, JsonNode.class); } catch (JsonProcessingException e) { diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java new file mode 100644 index 0000000000000..4e9434ea9e6ab --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumJsonRecordParser.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.format.debezium; + +import org.apache.paimon.flink.action.cdc.ComputedColumn; +import org.apache.paimon.flink.action.cdc.TypeMapping; +import org.apache.paimon.flink.action.cdc.format.RecordParser; +import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; +import org.apache.paimon.types.RowKind; +import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.StringUtils; + +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; + +import org.apache.commons.lang3.BooleanUtils; +import org.apache.flink.util.Collector; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +import static org.apache.paimon.utils.JsonSerdeUtil.isNull; +import static org.apache.paimon.utils.Preconditions.checkArgument; + +/** + * The {@code DebeziumRecordParser} class extends the abstract {@link RecordParser} and is designed + * to parse records from Debezium's JSON change data capture (CDC) format. Debezium is a CDC + * solution for MySQL databases that captures row-level changes to database tables and outputs them + * in JSON format. This parser extracts relevant information from the debezium-json format and + * converts it into a list of {@link RichCdcMultiplexRecord} objects. + * + *

The class supports various database operations such as INSERT, UPDATE, and DELETE, and creates + * corresponding {@link RichCdcMultiplexRecord} objects to represent these changes. + * + *

Validation is performed to ensure that the JSON records contain all necessary fields, and the + * class also supports schema extraction for the Kafka topic. + */ +public class DebeziumJsonRecordParser extends RecordParser { + + private static final String RECORD_PAYLOAD = "payload"; + private static final String RECORD_SCHEMA = "schema"; + private static final String RECORD_SOURCE = "source"; + private static final String RECORD_SOURCE_DB = "db"; + private static final String FIELD_BEFORE = "before"; + private static final String OP_FIELD = "op"; + private static final String OP_READ = "r"; // snapshot read + private static final String OP_CREATE = "c"; // insert + private static final String OP_UPDATE = "u"; // update + private static final String OP_DELETE = "d"; // delete + + protected JsonNode keyPayload; + protected JsonNode source; + + public DebeziumJsonRecordParser( + boolean caseSensitive, TypeMapping typeMapping, List computedColumns) { + super(caseSensitive, typeMapping, computedColumns); + } + + @Override + protected Boolean isDDL() { + return root.has("tableChanges"); + } + + @Override + protected List extractRecords() { + List records = new ArrayList<>(); + // skip ddl + if (BooleanUtils.isTrue(this.isDDL())) { + return Collections.emptyList(); + } + String operation = extractStringFromRootJson(OP_FIELD); + switch (operation) { + case OP_CREATE: + case OP_READ: + processRecord(root.get(dataField()), RowKind.INSERT, records); + break; + case OP_UPDATE: + processRecord(root.get(FIELD_BEFORE), RowKind.DELETE, records); + processRecord(root.get(dataField()), RowKind.INSERT, records); + break; + case OP_DELETE: + processRecord(root.get(FIELD_BEFORE), RowKind.DELETE, records); + break; + default: + throw new UnsupportedOperationException("Unknown record operation: " + operation); + } + return records; + } + + @Override + protected void validateFormat() { + String errorMessageTemplate = + "Didn't find '%s' node in json. Please make sure your topic's format is correct."; + checkArgument(!isNull(keyPayload), errorMessageTemplate, FIELD_RECORD_KEY); + checkArgument(!isNull(source), errorMessageTemplate, RECORD_SOURCE); + checkArgument(!isNull(source.get(FIELD_TABLE)), errorMessageTemplate, FIELD_TABLE); + checkArgument( + !isNull(source.get(RECORD_SOURCE_DB)), errorMessageTemplate, RECORD_SOURCE_DB); + checkArgument(!isNull(root.get(OP_FIELD)), errorMessageTemplate, OP_FIELD); + if (root.get(OP_FIELD).asText().equals(OP_DELETE)) { + checkArgument(!isNull(root.get(FIELD_BEFORE)), errorMessageTemplate, FIELD_BEFORE); + } else { + checkArgument(!isNull(root.get(dataField())), errorMessageTemplate, dataField()); + } + } + + @Override + protected String primaryField() { + // No-op + return null; + } + + @Override + protected String dataField() { + return "after"; + } + + @Override + protected void extractPrimaryKeys() { + primaryKeys = Lists.newArrayList(keyPayload.fieldNames()); + } + + @Override + public void flatMap(Pair record, Collector out) throws Exception { + // Extract debezium record key payload field + if (Objects.nonNull(record.getKey())) { + this.extractKeyPayload( + OBJECT_MAPPER.readValue((String) record.getKey(), JsonNode.class)); + } + // Extract debezium record value payload field + if (Objects.nonNull(record.getRight())) { + this.extractValuePayload( + OBJECT_MAPPER.readValue((String) record.getValue(), JsonNode.class)); + } + this.validateFormat(); + databaseName = extractStringFromRootJson(FIELD_DATABASE); + tableName = extractStringFromRootJson(FIELD_TABLE); + extractRecords().forEach(out::collect); + } + + @Override + protected void parseKeyIfNeed(String key) { + if (StringUtils.isBlank(key)) { + return; + } + try { + // For extract primary key + JsonNode recordData = OBJECT_MAPPER.readValue(key, JsonNode.class); + extractKeyPayload(recordData); + } catch (JsonProcessingException e) { + throw new RuntimeException("Error processing record key JSON: " + key, e); + } + } + + private void extractKeyPayload(JsonNode recordData) { + if (includeSchema(recordData)) { + keyPayload = recordData.get(RECORD_PAYLOAD); + } else { + keyPayload = recordData; + } + } + + @Override + protected void parseRootJson(String record) { + try { + JsonNode recordData = OBJECT_MAPPER.readValue(record, JsonNode.class); + extractValuePayload(recordData); + } catch (JsonProcessingException e) { + throw new RuntimeException("Error processing JSON: " + record, e); + } + } + + private void extractValuePayload(JsonNode recordData) { + // for extract record value + if (includeSchema(recordData)) { + root = recordData.get(RECORD_PAYLOAD); + } else { + root = recordData; + } + // For extract record metadata + source = root.get(RECORD_SOURCE); + } + + @Override + protected String extractStringFromRootJson(String key) { + if (key.equals(FIELD_TABLE)) { + tableName = source.get(FIELD_TABLE).asText(); + return tableName; + } else if (key.equals(FIELD_DATABASE)) { + this.databaseName = source.get(RECORD_SOURCE_DB).asText(); + return databaseName; + } + return root.get(key) != null ? root.get(key).asText() : null; + } + + private boolean includeSchema(JsonNode record) { + return record.size() == 2 && record.has(RECORD_SCHEMA) && record.has(RECORD_PAYLOAD); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java deleted file mode 100644 index b1d858bf4e821..0000000000000 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumRecordParser.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.action.cdc.format.debezium; - -import org.apache.paimon.flink.action.cdc.ComputedColumn; -import org.apache.paimon.flink.action.cdc.TypeMapping; -import org.apache.paimon.flink.action.cdc.format.RecordParser; -import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; -import org.apache.paimon.types.RowKind; - -import java.util.ArrayList; -import java.util.List; - -import static org.apache.flink.shaded.guava30.com.google.common.base.Preconditions.checkArgument; -import static org.apache.paimon.utils.JsonSerdeUtil.isNull; - -/** - * The {@code DebeziumRecordParser} class extends the abstract {@link RecordParser} and is designed - * to parse records from Debezium's JSON change data capture (CDC) format. Debezium is a CDC - * solution for MySQL databases that captures row-level changes to database tables and outputs them - * in JSON format. This parser extracts relevant information from the Debezium-JSON format and - * converts it into a list of {@link RichCdcMultiplexRecord} objects. - * - *

The class supports various database operations such as INSERT, UPDATE, DELETE, and READ - * (snapshot reads), and creates corresponding {@link RichCdcMultiplexRecord} objects to represent - * these changes. - * - *

Validation is performed to ensure that the JSON records contain all necessary fields, - * including the 'before' and 'after' states for UPDATE operations, and the class also supports - * schema extraction for the Kafka topic. Debezium's specific fields such as 'source', 'op' for - * operation type, and primary key field names are used to construct the details of each record - * event. - */ -public class DebeziumRecordParser extends RecordParser { - - private static final String FIELD_BEFORE = "before"; - private static final String FIELD_AFTER = "after"; - private static final String FIELD_SOURCE = "source"; - private static final String FIELD_PRIMARY = "pkNames"; - private static final String FIELD_DB = "db"; - private static final String FIELD_TYPE = "op"; - private static final String OP_INSERT = "c"; - private static final String OP_UPDATE = "u"; - private static final String OP_DELETE = "d"; - private static final String OP_READE = "r"; - - public DebeziumRecordParser( - boolean caseSensitive, TypeMapping typeMapping, List computedColumns) { - super(caseSensitive, typeMapping, computedColumns); - } - - @Override - public List extractRecords() { - String operation = extractStringFromRootJson(FIELD_TYPE); - List records = new ArrayList<>(); - switch (operation) { - case OP_INSERT: - case OP_READE: - processRecord(root.get(dataField()), RowKind.INSERT, records); - break; - case OP_UPDATE: - processRecord( - mergeOldRecord(root.get(dataField()), root.get(FIELD_BEFORE)), - RowKind.DELETE, - records); - processRecord(root.get(dataField()), RowKind.INSERT, records); - break; - case OP_DELETE: - processRecord(root.get(FIELD_BEFORE), RowKind.DELETE, records); - break; - default: - throw new UnsupportedOperationException("Unknown record operation: " + operation); - } - return records; - } - - @Override - protected void validateFormat() { - String errorMessageTemplate = - "Didn't find '%s' node in json. Please make sure your topic's format is correct."; - checkArgument( - !isNull(root.get(FIELD_SOURCE).get(FIELD_TABLE)), - errorMessageTemplate, - FIELD_TABLE); - checkArgument( - !isNull(root.get(FIELD_SOURCE).get(FIELD_DB)), - errorMessageTemplate, - FIELD_DATABASE); - checkArgument(!isNull(root.get(FIELD_TYPE)), errorMessageTemplate, FIELD_TYPE); - String operation = root.get(FIELD_TYPE).asText(); - switch (operation) { - case OP_INSERT: - case OP_READE: - checkArgument(!isNull(root.get(dataField())), errorMessageTemplate, dataField()); - break; - case OP_UPDATE: - case OP_DELETE: - checkArgument(!isNull(root.get(FIELD_BEFORE)), errorMessageTemplate, FIELD_BEFORE); - break; - default: - throw new IllegalArgumentException("Unsupported operation type: " + operation); - } - checkArgument(!isNull(root.get(primaryField())), errorMessageTemplate, primaryField()); - } - - @Override - protected String primaryField() { - return FIELD_PRIMARY; - } - - @Override - protected String dataField() { - return FIELD_AFTER; - } - - @Override - protected String extractStringFromRootJson(String key) { - if (key.equals(FIELD_TABLE)) { - tableName = root.get(FIELD_SOURCE).get(FIELD_TABLE).asText(); - return tableName; - } else if (key.equals(FIELD_DATABASE)) { - databaseName = root.get(FIELD_SOURCE).get(FIELD_DB).asText(); - return databaseName; - } - return root.get(key) != null ? root.get(key).asText() : null; - } -} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java index 27092ea113cb6..eaaa705fa3ab5 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionUtils.java @@ -20,15 +20,14 @@ import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils; import org.apache.paimon.flink.action.cdc.format.DataFormat; -import org.apache.paimon.flink.action.cdc.format.debezium.JsonPrimaryKeyDeserializationSchema; -import org.apache.paimon.utils.JsonSerdeUtil; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.StringUtils; -import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; +import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.ScanStartupMode; @@ -36,6 +35,7 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.util.CollectionUtil; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -55,7 +55,6 @@ import java.util.Properties; import java.util.UUID; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.SCAN_STARTUP_SPECIFIC_OFFSETS; import static org.apache.paimon.utils.Preconditions.checkArgument; @@ -67,16 +66,10 @@ public class KafkaActionUtils { private static final String PARTITION = "partition"; private static final String OFFSET = "offset"; - private static final String DEBEZIUM_JSON = "debezium-json"; - public static KafkaSource buildKafkaSource(Configuration kafkaConfig) { - return buildKafkaSource(kafkaConfig, new ArrayList<>()); - } - - public static KafkaSource buildKafkaSource( - Configuration kafkaConfig, List primaryKeys) { + public static KafkaSource buildKafkaSource(Configuration kafkaConfig) { validateKafkaConfig(kafkaConfig); - KafkaSourceBuilder kafkaSourceBuilder = KafkaSource.builder(); + KafkaSourceBuilder kafkaSourceBuilder = KafkaSource.builder(); List topics = kafkaConfig.get(KafkaConnectorOptions.TOPIC).stream() @@ -85,11 +78,11 @@ public static KafkaSource buildKafkaSource( kafkaSourceBuilder .setTopics(topics) - .setGroupId(kafkaPropertiesGroupId(kafkaConfig)) - .setValueOnlyDeserializer( - DEBEZIUM_JSON.equals(kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT)) - ? new JsonPrimaryKeyDeserializationSchema(primaryKeys) - : new SimpleStringSchema()); + .setDeserializer( + KafkaRecordDeserializationSchema.of( + new KafkaKeyValueDeserializationSchema())) + // .setValueOnlyDeserializer(new SimpleStringSchema()) + .setGroupId(kafkaPropertiesGroupId(kafkaConfig)); Properties properties = new Properties(); for (Map.Entry entry : kafkaConfig.toMap().entrySet()) { String key = entry.getKey(); @@ -273,11 +266,6 @@ static DataFormat getDataFormat(Configuration kafkaConfig) { static MessageQueueSchemaUtils.ConsumerWrapper getKafkaEarliestConsumer( Configuration kafkaConfig, String topic) { - return getKafkaEarliestConsumer(kafkaConfig, topic, new ArrayList<>()); - } - - static MessageQueueSchemaUtils.ConsumerWrapper getKafkaEarliestConsumer( - Configuration kafkaConfig, String topic, List primaryKeys) { Properties props = new Properties(); props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, @@ -306,33 +294,26 @@ static MessageQueueSchemaUtils.ConsumerWrapper getKafkaEarliestConsumer( Collections.singletonList(new TopicPartition(topic, firstPartition)); consumer.assign(topicPartitions); consumer.seekToBeginning(topicPartitions); - return new KafkaConsumerWrapper( - consumer, - DEBEZIUM_JSON.equals(kafkaConfig.get(KafkaConnectorOptions.VALUE_FORMAT)) - ? primaryKeys - : new ArrayList<>()); + + return new KafkaConsumerWrapper(consumer); } private static class KafkaConsumerWrapper implements MessageQueueSchemaUtils.ConsumerWrapper { - private static final String PK_NAMES_KEY = "pkNames"; - private final KafkaConsumer consumer; - private final List pkNames; - - KafkaConsumerWrapper(KafkaConsumer kafkaConsumer, List pkNames) { + KafkaConsumerWrapper(KafkaConsumer kafkaConsumer) { this.consumer = kafkaConsumer; - this.pkNames = pkNames; } @Override - public List getRecords(String topic, int pollTimeOutMills) { + public List> getRecords(String topic, int pollTimeOutMills) { ConsumerRecords consumerRecords = consumer.poll(Duration.ofMillis(pollTimeOutMills)); - return StreamSupport.stream(consumerRecords.records(topic).spliterator(), false) - .map(r -> JsonSerdeUtil.putArrayToJsonString(r.value(), PK_NAMES_KEY, pkNames)) - .collect(Collectors.toList()); + Iterable> records = consumerRecords.records(topic); + List> result = new ArrayList<>(); + records.forEach(r -> result.add(Pair.of(r.key(), r.value()))); + return result; } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaKeyValueDeserializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaKeyValueDeserializationSchema.java new file mode 100644 index 0000000000000..853fe75d057b0 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaKeyValueDeserializationSchema.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.kafka; + +import org.apache.paimon.utils.Pair; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.nio.charset.StandardCharsets; + +/** + * DeserializationSchema that deserializes the key and value of Kafka's record into a Pair object. + * + *

Key fields can be accessed by calling pair.getKey() + * + *

Value fields can be accessed by calling pair.getValue() + */ +public class KafkaKeyValueDeserializationSchema implements KafkaDeserializationSchema { + + @Override + public void open(DeserializationSchema.InitializationContext context) throws Exception {} + + @Override + public boolean isEndOfStream(Pair nextRecord) { + return false; + } + + @Override + public Pair deserialize(ConsumerRecord record) throws Exception { + return Pair.of( + record.key() == null ? null : new String(record.key(), StandardCharsets.UTF_8), + record.value() == null ? null : new String(record.value(), StandardCharsets.UTF_8)); + } + + @Override + public TypeInformation getProducedType() { + return TypeExtractor.getForClass(Pair.class); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java index 7cf8962409a36..d8e84b2201525 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java @@ -31,6 +31,7 @@ import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser; import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordSchemaBuilder; +import org.apache.paimon.utils.Pair; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.configuration.Configuration; @@ -149,7 +150,7 @@ public void build() throws Exception { catalog.createDatabase(database, true); - KafkaSource source = KafkaActionUtils.buildKafkaSource(kafkaConfig); + KafkaSource source = KafkaActionUtils.buildKafkaSource(kafkaConfig); DataFormat format = getDataFormat(kafkaConfig); RecordParser recordParser = diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java index 0f5ce4f1b3f24..de301d26cc683 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java @@ -21,6 +21,7 @@ import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils; import org.apache.paimon.flink.action.cdc.MessageQueueSyncTableActionBase; import org.apache.paimon.flink.action.cdc.format.DataFormat; +import org.apache.paimon.utils.Pair; import org.apache.flink.api.connector.source.Source; import org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions; @@ -40,8 +41,8 @@ public KafkaSyncTableAction( } @Override - protected Source buildSource() { - return KafkaActionUtils.buildKafkaSource(mqConfig, primaryKeys); + protected Source buildSource() { + return KafkaActionUtils.buildKafkaSource(mqConfig); } @Override @@ -51,7 +52,7 @@ protected String topic() { @Override protected MessageQueueSchemaUtils.ConsumerWrapper consumer(String topic) { - return KafkaActionUtils.getKafkaEarliestConsumer(mqConfig, topic, primaryKeys); + return KafkaActionUtils.getKafkaEarliestConsumer(mqConfig, topic); } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java index da9a1d1f1ee53..d7e81d7499dd3 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarActionUtils.java @@ -20,8 +20,8 @@ import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils; import org.apache.paimon.flink.action.cdc.format.DataFormat; +import org.apache.paimon.utils.Pair; -import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; @@ -151,11 +151,11 @@ public class PulsarActionUtils { .defaultValue(true) .withDescription("To specify the boundedness of a stream."); - static PulsarSource buildPulsarSource(Configuration rawConfig) { + static PulsarSource buildPulsarSource(Configuration rawConfig) { Configuration pulsarConfig = preprocessPulsarConfig(rawConfig); validatePulsarConfig(pulsarConfig); - PulsarSourceBuilder pulsarSourceBuilder = PulsarSource.builder(); + PulsarSourceBuilder pulsarSourceBuilder = PulsarSource.builder(); // the minimum setup pulsarSourceBuilder @@ -166,7 +166,7 @@ static PulsarSource buildPulsarSource(Configuration rawConfig) { Arrays.stream(pulsarConfig.get(TOPIC).split(",")) .map(String::trim) .collect(Collectors.toList())) - .setDeserializationSchema(new SimpleStringSchema()); + .setDeserializationSchema(new PulsarKeyValueDeserializationSchema()); // other settings @@ -367,12 +367,12 @@ private static class PulsarConsumerWrapper implements MessageQueueSchemaUtils.Co } @Override - public List getRecords(String topic, int pollTimeOutMills) { + public List> getRecords(String topic, int pollTimeOutMills) { try { Message message = consumer.receive(pollTimeOutMills, TimeUnit.MILLISECONDS); return message == null ? Collections.emptyList() - : Collections.singletonList(message.getValue()); + : Collections.singletonList(Pair.of(message.getKey(), message.getValue())); } catch (PulsarClientException e) { throw new RuntimeException(e); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarKeyValueDeserializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarKeyValueDeserializationSchema.java new file mode 100644 index 0000000000000..58df6ab9d9b13 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarKeyValueDeserializationSchema.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.pulsar; + +import org.apache.paimon.utils.Pair; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; +import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; +import org.apache.flink.util.Collector; +import org.apache.pulsar.client.api.Message; + +import java.nio.charset.StandardCharsets; + +/** + * DeserializationSchema that deserializes the key and value of Pulsar's record into a Pair object. + * + *

Key fields can be accessed by calling pair.getKey() + * + *

Value fields can be accessed by calling pair.getValue() + */ +public class PulsarKeyValueDeserializationSchema implements PulsarDeserializationSchema { + + public void open( + PulsarDeserializationSchema.PulsarInitializationContext context, + SourceConfiguration configuration) + throws Exception {} + + public void deserialize(Message message, Collector out) throws Exception { + out.collect( + Pair.of(message.getKey(), new String(message.getData(), StandardCharsets.UTF_8))); + } + + @Override + public TypeInformation getProducedType() { + return TypeExtractor.getForClass(Pair.class); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableAction.java index c1ccd2799023b..b9d86fc7c523b 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarSyncTableAction.java @@ -21,6 +21,7 @@ import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils; import org.apache.paimon.flink.action.cdc.MessageQueueSyncTableActionBase; import org.apache.paimon.flink.action.cdc.format.DataFormat; +import org.apache.paimon.utils.Pair; import org.apache.flink.api.connector.source.Source; import org.apache.pulsar.client.api.PulsarClientException; @@ -40,7 +41,7 @@ public PulsarSyncTableAction( } @Override - protected Source buildSource() { + protected Source buildSource() { return PulsarActionUtils.buildPulsarSource(mqConfig); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java index efd1f432e56dc..bd0473e82dda8 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreFactory.java @@ -48,9 +48,10 @@ import static org.apache.paimon.CoreOptions.LogConsistency; import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS; import static org.apache.paimon.flink.factories.FlinkFactoryUtil.createFlinkTableFactoryHelper; -import static org.apache.paimon.flink.kafka.KafkaLogOptions.TOPIC; + import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey; + /** The Kafka {@link LogStoreTableFactory} implementation. */ public class KafkaLogStoreFactory implements LogStoreTableFactory { @@ -64,7 +65,7 @@ public String identifier() { } private String topic(Context context) { - return context.getCatalogTable().getOptions().get(TOPIC.key()); + return context.getCatalogTable().getOptions().get(KafkaLogOptions.TOPIC.key()); } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java index c93889b66e7b1..220047994b188 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/kafka/KafkaLogStoreRegister.java @@ -42,7 +42,6 @@ import static org.apache.paimon.flink.FlinkConnectorOptions.LOG_SYSTEM_REPLICATION; import static org.apache.paimon.flink.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS; import static org.apache.paimon.flink.kafka.KafkaLogOptions.TOPIC; -import static org.apache.paimon.flink.kafka.KafkaLogStoreFactory.toKafkaProperties; import static org.apache.paimon.utils.Preconditions.checkNotNull; /** KafkaLogStoreRegister is used to register/unregister topics in Kafka for paimon table. */ @@ -95,7 +94,7 @@ public KafkaLogStoreRegister(LogStoreTableFactory.RegisterContext context) { this.replicationFactor = context.getOptions().get(LOG_SYSTEM_REPLICATION); - this.properties = toKafkaProperties(context.getOptions()); + this.properties = KafkaLogStoreFactory.toKafkaProperties(context.getOptions()); } @Override diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncDatabaseActionITCase.java new file mode 100644 index 0000000000000..307fd98e6b1c2 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncDatabaseActionITCase.java @@ -0,0 +1,510 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.kafka; + +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.testutils.assertj.AssertionUtils; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** IT cases for {@link KafkaSyncDatabaseAction}. */ +public class KafkaDebeziumSyncDatabaseActionITCase extends KafkaActionITCaseBase { + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Test + @Timeout(120) + public void testSchemaEvolutionMultiTopic() throws Exception { + final String topic1 = "schema_evolution_0"; + final String topic2 = "schema_evolution_1"; + boolean writeOne = false; + int fileCount = 2; + List topics = Arrays.asList(topic1, topic2); + topics.forEach(topic -> createTestTopic(topic, 1, 1)); + + // ---------- Write the debezium json into Kafka ------------------- + + for (int i = 0; i < fileCount; i++) { + try { + writeRecordsToKafka( + topics.get(i), + readLines( + "kafka/debezium/database/schemaevolution/topic" + + i + + "/debezium-data-1.txt")); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + } + + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "debezium-json"); + kafkaConfig.put("topic", String.join(";", topics)); + KafkaSyncDatabaseAction action = + syncDatabaseActionBuilder(kafkaConfig) + .withTableConfig(getBasicTableConfig()) + .build(); + runActionWithDefaultEnv(action); + + testSchemaEvolutionImpl(topics, writeOne, fileCount); + } + + @Test + @Timeout(120) + public void testSchemaEvolutionOneTopic() throws Exception { + final String topic = "schema_evolution"; + boolean writeOne = true; + int fileCount = 2; + List topics = Collections.singletonList(topic); + topics.forEach(t -> createTestTopic(t, 1, 1)); + + // ---------- Write the debezium json into Kafka ------------------- + + for (int i = 0; i < fileCount; i++) { + try { + writeRecordsToKafka( + topics.get(0), + readLines( + "kafka/debezium/database/schemaevolution/topic" + + i + + "/debezium-data-1.txt")); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + } + + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "debezium-json"); + kafkaConfig.put("topic", String.join(";", topics)); + KafkaSyncDatabaseAction action = + syncDatabaseActionBuilder(kafkaConfig) + .withTableConfig(getBasicTableConfig()) + .build(); + runActionWithDefaultEnv(action); + + testSchemaEvolutionImpl(topics, writeOne, fileCount); + } + + private void testSchemaEvolutionImpl(List topics, boolean writeOne, int fileCount) + throws Exception { + waitingTables("t1", "t2"); + + FileStoreTable table1 = getFileStoreTable("t1"); + FileStoreTable table2 = getFileStoreTable("t2"); + + RowType rowType1 = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight"}); + List primaryKeys1 = Collections.singletonList("id"); + List expected = + Arrays.asList( + "+I[101, scooter, Small 2-wheel scooter, 3.14]", + "+I[102, car battery, 12V car battery, 8.1]"); + waitForResult(expected, table1, rowType1, primaryKeys1); + + RowType rowType2 = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight"}); + List primaryKeys2 = Collections.singletonList("id"); + List expected2 = + Arrays.asList( + "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", + "+I[104, hammer, 12oz carpenter's hammer, 0.75]"); + waitForResult(expected2, table2, rowType2, primaryKeys2); + + for (int i = 0; i < fileCount; i++) { + try { + writeRecordsToKafka( + writeOne ? topics.get(0) : topics.get(i), + readLines( + "kafka/debezium/database/schemaevolution/topic" + + i + + "/debezium-data-2.txt")); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + } + + rowType1 = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight", "age"}); + expected = + Arrays.asList( + "+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]", + "+I[102, car battery, 12V car battery, 8.1, NULL]", + "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 19]", + "+I[104, hammer, 12oz carpenter's hammer, 0.75, 25]"); + waitForResult(expected, table1, rowType1, primaryKeys1); + + rowType2 = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight", "address"}); + expected = + Arrays.asList( + "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, Beijing]", + "+I[104, hammer, 12oz carpenter's hammer, 0.75, Shanghai]"); + waitForResult(expected, table2, rowType2, primaryKeys2); + } + + @Test + public void testTopicIsEmpty() { + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "debezium-json"); + + KafkaSyncDatabaseAction action = syncDatabaseActionBuilder(kafkaConfig).build(); + + assertThatThrownBy(action::run) + .satisfies( + AssertionUtils.anyCauseMatches( + IllegalArgumentException.class, + "kafka-conf [topic] must be specified.")); + } + + @Test + @Timeout(120) + public void testTableAffixMultiTopic() throws Exception { + // create table t1 + createFileStoreTable( + "test_prefix_t1_test_suffix", + RowType.of( + new DataType[] { + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight"}), + Collections.emptyList(), + Collections.singletonList("id"), + Collections.emptyMap()); + + final String topic1 = "prefix_suffix_0"; + final String topic2 = "prefix_suffix_1"; + boolean writeOne = false; + int fileCount = 2; + List topics = Arrays.asList(topic1, topic2); + topics.forEach(topic -> createTestTopic(topic, 1, 1)); + + // ---------- Write the debezium json into Kafka ------------------- + + for (int i = 0; i < topics.size(); i++) { + try { + writeRecordsToKafka( + topics.get(i), + readLines( + "kafka/debezium/database/prefixsuffix/topic" + + i + + "/debezium-data-1.txt")); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + } + + // try synchronization + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "debezium-json"); + kafkaConfig.put("topic", String.join(";", topics)); + KafkaSyncDatabaseAction action = + syncDatabaseActionBuilder(kafkaConfig) + .withTablePrefix("test_prefix_") + .withTableSuffix("_test_suffix") + .withTableConfig(getBasicTableConfig()) + // test including check with affix + .includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*") + .build(); + runActionWithDefaultEnv(action); + + testTableAffixImpl(topics, writeOne, fileCount); + } + + @Test + @Timeout(120) + public void testTableAffixOneTopic() throws Exception { + // create table t1 + createFileStoreTable( + "test_prefix_t1_test_suffix", + RowType.of( + new DataType[] { + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight"}), + Collections.emptyList(), + Collections.singletonList("id"), + Collections.emptyMap()); + + final String topic1 = "prefix_suffix"; + List topics = Collections.singletonList(topic1); + boolean writeOne = true; + int fileCount = 2; + topics.forEach(topic -> createTestTopic(topic, 1, 1)); + + // ---------- Write the debezium json into Kafka ------------------- + + for (int i = 0; i < fileCount; i++) { + try { + writeRecordsToKafka( + topics.get(0), + readLines( + "kafka/debezium/database/prefixsuffix/topic" + + i + + "/debezium-data-1.txt")); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + } + + // try synchronization + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "debezium-json"); + kafkaConfig.put("topic", String.join(";", topics)); + KafkaSyncDatabaseAction action = + syncDatabaseActionBuilder(kafkaConfig) + .withTablePrefix("test_prefix_") + .withTableSuffix("_test_suffix") + .withTableConfig(getBasicTableConfig()) + // test including check with affix + .includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*") + .build(); + runActionWithDefaultEnv(action); + + testTableAffixImpl(topics, writeOne, fileCount); + } + + private void testTableAffixImpl(List topics, boolean writeOne, int fileCount) + throws Exception { + waitingTables("test_prefix_t1_test_suffix", "test_prefix_t2_test_suffix"); + + FileStoreTable table1 = getFileStoreTable("test_prefix_t1_test_suffix"); + FileStoreTable table2 = getFileStoreTable("test_prefix_t2_test_suffix"); + + RowType rowType1 = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight"}); + List primaryKeys1 = Collections.singletonList("id"); + List expected = + Arrays.asList( + "+I[101, scooter, Small 2-wheel scooter, 3.14]", + "+I[102, car battery, 12V car battery, 8.1]"); + waitForResult(expected, table1, rowType1, primaryKeys1); + + RowType rowType2 = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight"}); + List primaryKeys2 = Collections.singletonList("id"); + expected = + Arrays.asList( + "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", + "+I[104, hammer, 12oz carpenter's hammer, 0.75]"); + waitForResult(expected, table2, rowType2, primaryKeys2); + + for (int i = 0; i < fileCount; i++) { + try { + writeRecordsToKafka( + writeOne ? topics.get(0) : topics.get(i), + readLines( + "kafka/debezium/database/prefixsuffix/topic" + + i + + "/debezium-data-2.txt")); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + } + rowType1 = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight", "address"}); + expected = + Arrays.asList( + "+I[101, scooter, Small 2-wheel scooter, 3.14, Beijing]", + "+I[102, car battery, 12V car battery, 8.1, Shanghai]"); + waitForResult(expected, table1, rowType1, primaryKeys1); + + rowType2 = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight", "age"}); + expected = + Arrays.asList( + "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 19]", + "+I[104, hammer, 12oz carpenter's hammer, 0.75, 25]"); + waitForResult(expected, table2, rowType2, primaryKeys2); + } + + @Test + @Timeout(60) + public void testIncludingTables() throws Exception { + includingAndExcludingTablesImpl( + "flink|paimon.+", + null, + Arrays.asList("flink", "paimon_1", "paimon_2"), + Collections.singletonList("ignore")); + } + + @Test + @Timeout(60) + public void testExcludingTables() throws Exception { + includingAndExcludingTablesImpl( + null, + "flink|paimon.+", + Collections.singletonList("ignore"), + Arrays.asList("flink", "paimon_1", "paimon_2")); + } + + @Test + @Timeout(120) + public void testIncludingAndExcludingTables() throws Exception { + includingAndExcludingTablesImpl( + "flink|paimon.+", + "paimon_1", + Arrays.asList("flink", "paimon_2"), + Arrays.asList("paimon_1", "ignore")); + } + + private void includingAndExcludingTablesImpl( + @Nullable String includingTables, + @Nullable String excludingTables, + List existedTables, + List notExistedTables) + throws Exception { + final String topic1 = "include_exclude" + UUID.randomUUID(); + List topics = Collections.singletonList(topic1); + topics.forEach(topic -> createTestTopic(topic, 1, 1)); + + // ---------- Write the debezium json into Kafka ------------------- + try { + writeRecordsToKafka( + topics.get(0), + readLines("kafka/debezium/database/include/topic0/debezium-data-1.txt")); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + // try synchronization + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "debezium-json"); + kafkaConfig.put("topic", String.join(";", topics)); + KafkaSyncDatabaseAction action = + syncDatabaseActionBuilder(kafkaConfig) + .includingTables(includingTables) + .excludingTables(excludingTables) + .withTableConfig(getBasicTableConfig()) + .build(); + runActionWithDefaultEnv(action); + + // check paimon tables + waitingTables(existedTables); + assertTableNotExists(notExistedTables); + } + + @Override + protected void writeRecordsToKafka(String topic, List lines) throws Exception { + Properties producerProperties = getStandardProps(); + producerProperties.setProperty("retries", "0"); + producerProperties.put( + "key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerProperties.put( + "value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + KafkaProducer kafkaProducer = new KafkaProducer(producerProperties); + for (int i = 0; i < lines.size(); i++) { + try { + String[] keyValue = lines.get(i).split(";"); + if (keyValue.length < 2) { + continue; + } + objectMapper.readTree(keyValue[0]); + objectMapper.readTree(keyValue[1]); + kafkaProducer.send(new ProducerRecord<>(topic, keyValue[0], keyValue[1])); + } catch (Exception e) { + // ignore + } + } + kafkaProducer.close(); + } +} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java index f73734df30bb7..4b6b6adf1cf51 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaDebeziumSyncTableActionITCase.java @@ -23,6 +23,10 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowType; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -30,12 +34,18 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Properties; + +import static org.apache.paimon.testutils.assertj.AssertionUtils.anyCauseMatches; +import static org.assertj.core.api.Assertions.assertThatThrownBy; -/** IT cases for {@link KafkaDebeziumSyncTableActionITCase}. */ +/** IT cases for {@link KafkaSyncTableAction}. */ public class KafkaDebeziumSyncTableActionITCase extends KafkaActionITCaseBase { + private final ObjectMapper objectMapper = new ObjectMapper(); + @Test - @Timeout(60) + @Timeout(120) public void testSchemaEvolution() throws Exception { runSingleTableSchemaEvolution("schemaevolution"); } @@ -60,7 +70,6 @@ private void runSingleTableSchemaEvolution(String sourceDir) throws Exception { .withTableConfig(getBasicTableConfig()) .build(); runActionWithDefaultEnv(action); - testSchemaEvolutionImpl(topic, sourceDir); } @@ -140,8 +149,321 @@ private void testSchemaEvolutionImpl(String topic, String sourceDir) throws Exce waitForResult(expected, table, rowType, primaryKeys); } + @Test + @Timeout(120) + public void testNotSupportFormat() throws Exception { + final String topic = "not_support"; + createTestTopic(topic, 1, 1); + // ---------- Write the debezium json into Kafka ------------------- + List lines = readLines("kafka/debezium/table/schemaevolution/debezium-data-1.txt"); + try { + writeRecordsToKafka(topic, lines); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "debeziums-json"); + kafkaConfig.put("topic", topic); + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig) + .withPrimaryKeys("id") + .withTableConfig(getBasicTableConfig()) + .build(); + + assertThatThrownBy(action::run) + .satisfies( + anyCauseMatches( + UnsupportedOperationException.class, + "This format: debeziums-json is not supported.")); + } + + @Test + @Timeout(120) + public void testAssertSchemaCompatible() throws Exception { + final String topic = "assert_schema_compatible"; + createTestTopic(topic, 1, 1); + // ---------- Write the debezium json into Kafka ------------------- + List lines = readLines("kafka/debezium/table/schemaevolution/debezium-data-1.txt"); + try { + writeRecordsToKafka(topic, lines); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "debezium-json"); + kafkaConfig.put("topic", topic); + + // create an incompatible table + createFileStoreTable( + RowType.of( + new DataType[] {DataTypes.STRING(), DataTypes.STRING()}, + new String[] {"k", "v1"}), + Collections.emptyList(), + Collections.singletonList("k"), + Collections.emptyMap()); + + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig) + .withPrimaryKeys("id") + .withTableConfig(getBasicTableConfig()) + .build(); + + assertThatThrownBy(action::run) + .satisfies( + anyCauseMatches( + IllegalArgumentException.class, + "Paimon schema and source table schema are not compatible.\n" + + "Paimon fields are: [`k` STRING NOT NULL, `v1` STRING].\n" + + "Source table fields are: [`id` STRING NOT NULL, `name` STRING, `description` STRING, `weight` STRING]")); + } + @Test @Timeout(60) + public void testStarUpOptionSpecific() throws Exception { + final String topic = "start_up_specific"; + createTestTopic(topic, 1, 1); + // ---------- Write the debezium json into Kafka ------------------- + List lines = readLines("kafka/debezium/table/startupmode/debezium-data-1.txt"); + try { + writeRecordsToKafka(topic, lines); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "debezium-json"); + kafkaConfig.put("topic", topic); + kafkaConfig.put("scan.startup.mode", "specific-offsets"); + kafkaConfig.put("scan.startup.specific-offsets", "partition:0,offset:1"); + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig) + .withPrimaryKeys("id") + .withTableConfig(getBasicTableConfig()) + .build(); + runActionWithDefaultEnv(action); + + FileStoreTable table = getFileStoreTable(tableName); + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight"}); + List primaryKeys = Collections.singletonList("id"); + // topic has two records we read two + List expected = + Collections.singletonList("+I[102, car battery, 12V car battery, 8.1]"); + waitForResult(expected, table, rowType, primaryKeys); + } + + @Test + @Timeout(120) + public void testStarUpOptionLatest() throws Exception { + final String topic = "start_up_latest"; + createTestTopic(topic, 1, 1); + // ---------- Write the debezium json into Kafka ------------------- + List lines = readLines("kafka/debezium/table/startupmode/debezium-data-1.txt"); + try { + writeRecordsToKafka(topic, lines); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "debezium-json"); + kafkaConfig.put("topic", topic); + kafkaConfig.put("scan.startup.mode", "latest-offset"); + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig) + .withPrimaryKeys("id") + .withTableConfig(getBasicTableConfig()) + .build(); + runActionWithDefaultEnv(action); + + Thread.sleep(5000); + FileStoreTable table = getFileStoreTable(tableName); + try { + writeRecordsToKafka( + topic, readLines("kafka/debezium/table/startupmode/debezium-data-2.txt")); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight"}); + List primaryKeys = Collections.singletonList("id"); + // topic has four records we read two + List expected = + Arrays.asList( + "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", + "+I[104, hammer, 12oz carpenter's hammer, 0.75]"); + waitForResult(expected, table, rowType, primaryKeys); + } + + @Test + @Timeout(120) + public void testStarUpOptionTimestamp() throws Exception { + final String topic = "start_up_timestamp"; + createTestTopic(topic, 1, 1); + // ---------- Write the debezium json into Kafka ------------------- + List lines = readLines("kafka/debezium/table/startupmode/debezium-data-1.txt"); + try { + writeRecordsToKafka(topic, lines); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "debezium-json"); + kafkaConfig.put("topic", topic); + kafkaConfig.put("scan.startup.mode", "timestamp"); + kafkaConfig.put( + "scan.startup.timestamp-millis", String.valueOf(System.currentTimeMillis())); + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig) + .withPrimaryKeys("id") + .withTableConfig(getBasicTableConfig()) + .build(); + runActionWithDefaultEnv(action); + + try { + writeRecordsToKafka( + topic, readLines("kafka/debezium/table/startupmode/debezium-data-2.txt")); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + FileStoreTable table = getFileStoreTable(tableName); + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight"}); + List primaryKeys = Collections.singletonList("id"); + // topic has four records we read two + List expected = + Arrays.asList( + "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", + "+I[104, hammer, 12oz carpenter's hammer, 0.75]"); + waitForResult(expected, table, rowType, primaryKeys); + } + + @Test + @Timeout(120) + public void testStarUpOptionEarliest() throws Exception { + final String topic = "start_up_earliest"; + createTestTopic(topic, 1, 1); + // ---------- Write the debezium json into Kafka ------------------- + List lines = readLines("kafka/debezium/table/startupmode/debezium-data-1.txt"); + try { + writeRecordsToKafka(topic, lines); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "debezium-json"); + kafkaConfig.put("topic", topic); + kafkaConfig.put("scan.startup.mode", "earliest-offset"); + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig) + .withPrimaryKeys("id") + .withTableConfig(getBasicTableConfig()) + .build(); + runActionWithDefaultEnv(action); + + try { + writeRecordsToKafka( + topic, readLines("kafka/debezium/table/startupmode/debezium-data-2.txt")); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + FileStoreTable table = getFileStoreTable(tableName); + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight"}); + List primaryKeys = Collections.singletonList("id"); + // topic has four records we read all + List expected = + Arrays.asList( + "+I[101, scooter, Small 2-wheel scooter, 3.14]", + "+I[102, car battery, 12V car battery, 8.1]", + "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", + "+I[104, hammer, 12oz carpenter's hammer, 0.75]"); + waitForResult(expected, table, rowType, primaryKeys); + } + + @Test + @Timeout(120) + public void testStarUpOptionGroup() throws Exception { + final String topic = "start_up_group"; + createTestTopic(topic, 1, 1); + // ---------- Write the debezium json into Kafka ------------------- + List lines = readLines("kafka/debezium/table/startupmode/debezium-data-1.txt"); + try { + writeRecordsToKafka(topic, lines); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + Map kafkaConfig = getBasicKafkaConfig(); + kafkaConfig.put("value.format", "debezium-json"); + kafkaConfig.put("topic", topic); + kafkaConfig.put("scan.startup.mode", "group-offsets"); + KafkaSyncTableAction action = + syncTableActionBuilder(kafkaConfig) + .withPrimaryKeys("id") + .withTableConfig(getBasicTableConfig()) + .build(); + runActionWithDefaultEnv(action); + + try { + writeRecordsToKafka( + topic, readLines("kafka/debezium/table/startupmode/debezium-data-2.txt")); + } catch (Exception e) { + throw new Exception("Failed to write debezium data to Kafka.", e); + } + FileStoreTable table = getFileStoreTable(tableName); + + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"id", "name", "description", "weight"}); + List primaryKeys = Collections.singletonList("id"); + // topic has four records we read all + List expected = + Arrays.asList( + "+I[101, scooter, Small 2-wheel scooter, 3.14]", + "+I[102, car battery, 12V car battery, 8.1]", + "+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]", + "+I[104, hammer, 12oz carpenter's hammer, 0.75]"); + waitForResult(expected, table, rowType, primaryKeys); + } + + @Test + @Timeout(120) public void testComputedColumn() throws Exception { String topic = "computed_column"; createTestTopic(topic, 1, 1); @@ -150,15 +472,16 @@ public void testComputedColumn() throws Exception { try { writeRecordsToKafka(topic, lines); } catch (Exception e) { - throw new Exception("Failed to write canal data to Kafka.", e); + throw new Exception("Failed to write debezium data to Kafka.", e); } Map kafkaConfig = getBasicKafkaConfig(); kafkaConfig.put("value.format", "debezium-json"); kafkaConfig.put("topic", topic); KafkaSyncTableAction action = syncTableActionBuilder(kafkaConfig) - .withPrimaryKeys("id") - .withComputedColumnArgs("_year=year(date)") + .withPartitionKeys("_year") + .withPrimaryKeys("_id", "_year") + .withComputedColumnArgs("_year=year(_date)") .withTableConfig(getBasicTableConfig()) .build(); runActionWithDefaultEnv(action); @@ -166,13 +489,40 @@ public void testComputedColumn() throws Exception { RowType rowType = RowType.of( new DataType[] { - DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.INT() + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.INT().notNull() }, - new String[] {"id", "date", "_year"}); + new String[] {"_id", "_date", "_year"}); waitForResult( Collections.singletonList("+I[101, 2023-03-23, 2023]"), getFileStoreTable(tableName), rowType, - Collections.singletonList("id")); + Arrays.asList("_id", "_year")); + } + + @Override + protected void writeRecordsToKafka(String topic, List lines) throws Exception { + Properties producerProperties = getStandardProps(); + producerProperties.setProperty("retries", "0"); + producerProperties.put( + "key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + producerProperties.put( + "value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + KafkaProducer kafkaProducer = new KafkaProducer(producerProperties); + for (int i = 0; i < lines.size(); i++) { + try { + String[] keyValue = lines.get(i).split(";"); + if (keyValue.length < 2) { + continue; + } + objectMapper.readTree(keyValue[0]); + objectMapper.readTree(keyValue[1]); + kafkaProducer.send(new ProducerRecord<>(topic, keyValue[0], keyValue[1])); + } catch (Exception e) { + // ignore + } + } + kafkaProducer.close(); } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/include/topic0/debezium-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/include/topic0/debezium-data-1.txt new file mode 100644 index 0000000000000..a3031a5c936eb --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/include/topic0/debezium-data-1.txt @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"schema":null,"payload": {"id":101}};{"schema":null, "payload": {"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"paimon_sync_database_affix","table":"paimon_1","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}} +{"schema":null,"payload": {"id":102}};{"schema":null, "payload": {"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"paimon_sync_database_affix","table":"paimon_2","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}} +{"schema":null,"payload": {"id":103}};{"schema":null, "payload": {"before":null,"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"paimon_sync_database_affix","table":"ignore","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}} +{"schema":null,"payload": {"id":104}};{"schema":null, "payload": {"before":null,"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"paimon_sync_database_affix","table":"flink","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}} \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/prefixsuffix/topic0/debezium-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/prefixsuffix/topic0/debezium-data-1.txt new file mode 100644 index 0000000000000..2b2a125774962 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/prefixsuffix/topic0/debezium-data-1.txt @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"id":101};{"schema":null, "payload": {"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"paimon_sync_database_affix","table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}} +{"schema":null,"payload": {"id":102}};{"schema":null, "payload": {"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"paimon_sync_database_affix","table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}} \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/prefixsuffix/topic0/debezium-data-2.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/prefixsuffix/topic0/debezium-data-2.txt new file mode 100644 index 0000000000000..52333c31ad0a5 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/prefixsuffix/topic0/debezium-data-2.txt @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"id":101};{"schema":null, "payload": {"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14,"address":"Beijing"},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"paimon_sync_database_affix","table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}} +{"schema":null,"payload": {"id":102}};{"schema":null, "payload": {"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1,"address":"Shanghai"},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"paimon_sync_database_affix","table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}} \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/prefixsuffix/topic1/debezium-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/prefixsuffix/topic1/debezium-data-1.txt new file mode 100644 index 0000000000000..157f6a6f22016 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/prefixsuffix/topic1/debezium-data-1.txt @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"id":103};{"schema":null, "payload": {"before":null,"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"paimon_sync_database_affix","table":"t2","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}} +{"schema":null,"payload": {"id":104}};{"schema":null, "payload": {"before":null,"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"paimon_sync_database_affix","table":"t2","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}} \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/prefixsuffix/topic1/debezium-data-2.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/prefixsuffix/topic1/debezium-data-2.txt new file mode 100644 index 0000000000000..3f6da8b24c062 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/prefixsuffix/topic1/debezium-data-2.txt @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"id":103};{"schema":null, "payload": {"before":null,"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8,"age":19},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"paimon_sync_database_affix","table":"t2","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}} +{"schema":null,"payload": {"id":104}};{"schema":null, "payload": {"before":null,"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75,"age":25},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"paimon_sync_database_affix","table":"t2","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}} \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/schemaevolution/topic0/debezium-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/schemaevolution/topic0/debezium-data-1.txt new file mode 100644 index 0000000000000..84280a7c40893 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/schemaevolution/topic0/debezium-data-1.txt @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"id":101};{"schema":null, "payload": {"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"paimon_sync_database","table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}} +{"schema":null,"payload": {"id":102}};{"schema":null, "payload": {"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"paimon_sync_database","table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}} \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/schemaevolution/topic0/debezium-data-2.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/schemaevolution/topic0/debezium-data-2.txt new file mode 100644 index 0000000000000..faa7429a4b0bf --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/schemaevolution/topic0/debezium-data-2.txt @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"id":103};{"schema":null, "payload": {"before":null,"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8,"age":19},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"paimon_sync_database","table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}} +{"schema":null,"payload": {"id":104}};{"schema":null, "payload": {"before":null,"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75,"age":25},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"paimon_sync_database","table":"t1","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/schemaevolution/topic1/debezium-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/schemaevolution/topic1/debezium-data-1.txt new file mode 100644 index 0000000000000..dd6b4d101d6bd --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/schemaevolution/topic1/debezium-data-1.txt @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"schema":null,"payload": {"id":103}};{"schema":null, "payload": {"before":null,"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"paimon_sync_database","table":"t2","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}} +{"schema":null,"payload": {"id":104}};{"schema":null, "payload": {"before":null,"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"paimon_sync_database","table":"t2","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}} \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/schemaevolution/topic1/debezium-data-2.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/schemaevolution/topic1/debezium-data-2.txt new file mode 100644 index 0000000000000..f142db3a632e6 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/database/schemaevolution/topic1/debezium-data-2.txt @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"schema":null,"payload": {"id":103}};{"schema":null, "payload": {"before":null,"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8,"address":"Beijing"},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"paimon_sync_database","table":"t2","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}} +{"schema":null,"payload": {"id":104}};{"schema":null, "payload": {"before":null,"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75,"address":"Shanghai"},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"paimon_sync_database","table":"t2","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/computedcolumn/debezium-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/computedcolumn/debezium-data-1.txt index 5a571d3fd3833..b58af799622cc 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/computedcolumn/debezium-data-1.txt +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/computedcolumn/debezium-data-1.txt @@ -16,4 +16,4 @@ * limitations under the License. */ -{"before": null, "after": {"id": 101, "date": "2023-03-23"}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null} +{"schema": null,"payload": {"_id":101}};{"schema":null, "payload": {"before":null,"after":{"_id":101,"_date":"2023-03-23"},"source":{"version":"2.4.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"test","table":"product","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-1.txt index b3ff4e23a3fbd..ef0f607b22630 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-1.txt +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-1.txt @@ -16,5 +16,5 @@ * limitations under the License. */ -{"before": null, "after": {"id": 101, "name": "scooter", "description": "Small 2-wheel scooter", "weight": 3.14}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null} -{"before": null, "after": {"id": 102, "name": "car battery", "description": "12V car battery", "weight": 8.1}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null} +{"schema":null,"payload": {"id":101}};{"schema":null, "payload": {"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"test","table":"product","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}} +{"schema":null,"payload": {"id":102}};{"schema":null, "payload": {"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"test","table":"product","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-2.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-2.txt index 528f965223149..9286ce973cf75 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-2.txt +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-2.txt @@ -16,5 +16,5 @@ * limitations under the License. */ -{"before": null, "after": {"id": 103, "name": "12-pack drill bits", "description": "12-pack of drill bits with sizes ranging from #40 to #3", "weight": 0.8, "age": 18}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null} -{"before": null, "after": {"id": 104, "name": "hammer", "description": "12oz carpenter's hammer", "weight": 0.75, "age": 24}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null} +{"schema":null,"payload": {"id":103}};{"schema":null, "payload": {"before":null,"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8,"age":18},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"test","table":"product","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}} +{"schema":null,"payload": {"id":104}};{"schema":null, "payload": {"before":null,"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75, "age":24},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"test","table":"product","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-3.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-3.txt index 79210f88d9a88..f4316355e0a40 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-3.txt +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/schemaevolution/debezium-data-3.txt @@ -16,7 +16,7 @@ * limitations under the License. */ -{"before": null, "after": {"id": 105, "name": "hammer", "description": "14oz carpenter's hammer", "weight": 0.875, "address": "Shanghai"}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null} -{"before": {"id": 101, "name": "scooter", "description": "Small 2-wheel scooter", "weight": 3.14}, "after": null, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "d", "ts_ms": 1596684883000, "transaction": null} -{"before": {"address": "Shanghai"}, "after": {"id": 105, "name": "hammer", "description": "14oz carpenter's hammer", "weight": 0.875, "address": "Beijing"}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684906000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "u", "ts_ms": 1596684906000, "transaction": null} -{"before": null, "after": {"id": 107, "name": "rocks", "description": "box of assorted rocks", "weight": 5.3}, "source": {"version": "1.9.7.Final", "connector": "mysql", "name": "mysql_binlog_source", "ts_ms": 1596684883000, "snapshot": "false", "db": "test", "sequence": null, "table": "product", "server_id": 0, "gtid": null, "file": "", "pos": 0, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1596684883000, "transaction": null} +{"schema":null,"payload": {"id":105}};{"schema":null, "payload": {"before":null,"after":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875,"address":"Shanghai"},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"test","table":"product","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}} +{"schema":null,"payload": {"id":101}};{"schema":null, "payload": {"after":null,"before":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"test","table":"product","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1589355606100,"transaction":null}} +{"schema":null,"payload": {"id":105}};{"schema":null, "payload": {"before":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875,"address":"Shanghai"},"after":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875,"address":"Beijing"},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"test","table":"product","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1589355606101,"transaction":null}} +{"schema":null,"payload": {"id":107}};{"schema":null, "payload": {"before":null,"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"test","table":"product","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}} diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/startupmode/debezium-data-1.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/startupmode/debezium-data-1.txt new file mode 100644 index 0000000000000..5517f213fb649 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/startupmode/debezium-data-1.txt @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"key"},"payload": {"id":101}};{"schema":null, "payload": {"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"test","table":"product","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}} +{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"key"},"payload": {"id":102}};{"schema":null, "payload": {"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"test","table":"product","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}} \ No newline at end of file diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/startupmode/debezium-data-2.txt b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/startupmode/debezium-data-2.txt new file mode 100644 index 0000000000000..2afb3a572a98e --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/debezium/table/startupmode/debezium-data-2.txt @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"key"},"payload": {"id":103}};{"schema":null, "payload": {"before":null,"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"test","table":"product","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}} +{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"key"},"payload": {"id":104}};{"schema":null, "payload": {"before":null,"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"test","table":"product","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}}