diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java index 332b8fa5b3f22..1aab6653d4d4f 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/format/debezium/DebeziumSchemaUtils.java @@ -19,20 +19,14 @@ package org.apache.paimon.flink.action.cdc.format.debezium; import org.apache.paimon.flink.action.cdc.TypeMapping; -import org.apache.paimon.flink.action.cdc.format.DataFormat; import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.DecimalType; import org.apache.paimon.utils.DateTimeUtils; -import org.apache.paimon.utils.JsonSerdeUtil; -import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.StringUtils; -import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; -import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; -import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import io.debezium.data.Bits; import io.debezium.data.geometry.Geometry; @@ -52,13 +46,9 @@ import java.time.ZoneId; import java.time.ZoneOffset; import java.util.Base64; -import java.util.List; import java.util.Map; import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING; -import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumRecordParser.FIELD_PAYLOAD; -import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumRecordParser.FIELD_PRIMARY; -import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumRecordParser.FIELD_SCHEMA; /** * Utils to handle 'schema' field in debezium Json. TODO: The methods have many duplicate codes with @@ -66,53 +56,6 @@ */ public class DebeziumSchemaUtils { - /** Rewrite value. */ - public static String rewriteValue(Pair message, String format) { - DataFormat dataFormat = DataFormat.fromConfigString(format); - String value = message.getValue(); - switch (dataFormat) { - case DEBEZIUM_JSON: - if (StringUtils.isBlank(message.getKey())) { - return value; - } else { - String key = message.getKey(); - Pair keyValue = Pair.of(key, value); - String newValue = extractPrimaryKeys(keyValue); - return newValue; - } - default: - return value; - } - } - - /** Append primary keys to value. */ - public static String extractPrimaryKeys(Pair record) { - String key = record.getKey(); - if (StringUtils.isBlank(key)) { - return record.getValue(); - } - try { - List primaryKeys = Lists.newArrayList(); - JsonNode keyNode = JsonSerdeUtil.fromJson(key, JsonNode.class); - JsonNode payload; - if (keyNode.has(FIELD_SCHEMA)) { - payload = keyNode.get(FIELD_PAYLOAD); - } else { - payload = keyNode; - } - payload.fieldNames().forEachRemaining(primaryKeys::add); - ObjectNode valueNode = JsonSerdeUtil.fromJson(record.getValue(), ObjectNode.class); - - // append primary keys - JsonSerdeUtil.setNode(valueNode, FIELD_PRIMARY, primaryKeys); - return JsonSerdeUtil.writeValueAsString(valueNode); - } catch (JsonProcessingException e) { - throw new RuntimeException( - "An error occurred when automatically attaching the debezium primary keys to Value", - e); - } - } - /** Transform raw string value according to schema. */ public static String transformRawValue( @Nullable String rawValue, diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaKeyValueDeserializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaKeyValueDeserializationSchema.java deleted file mode 100644 index 0bfe9ef6ff194..0000000000000 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaKeyValueDeserializationSchema.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.action.cdc.kafka; - -import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils; -import org.apache.paimon.utils.Pair; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; -import org.apache.flink.util.Collector; -import org.apache.flink.util.Preconditions; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; - -/** Deserialization for kafka key and value. */ -public class KafkaKeyValueDeserializationSchema - implements KafkaRecordDeserializationSchema { - - private static final Logger LOG = - LoggerFactory.getLogger(KafkaKeyValueDeserializationSchema.class); - - private String charset; - private String format; - - public KafkaKeyValueDeserializationSchema(String format) { - this(StandardCharsets.UTF_8.name(), format); - } - - public KafkaKeyValueDeserializationSchema(String charset, String format) { - this.charset = Preconditions.checkNotNull(charset); - this.format = format; - } - - @Override - public void deserialize(ConsumerRecord record, Collector collector) - throws IOException { - if (record.value() != null) { - String value = new String(record.value(), Charset.forName(charset)); - String key = - record.key() != null - ? new String(record.key(), Charset.forName(charset)) - : null; - collector.collect(DebeziumSchemaUtils.rewriteValue(Pair.of(key, value), format)); - } else { - // see - // https://debezium.io/documentation/reference/2.5/connectors/mysql.html#mysql-tombstone-events - LOG.info( - "Found null message value:\n{}\nThis message will be ignored. It might be produced by tombstone-event, " - + "please check your Debezium and Kafka configuration.", - record); - } - } - - @Override - public TypeInformation getProducedType() { - return TypeInformation.of(String.class); - } -} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaValueOnlyDeserializationSchemaWrapper.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaValueOnlyDeserializationSchemaWrapper.java deleted file mode 100644 index 5e6b96670bdbd..0000000000000 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaValueOnlyDeserializationSchemaWrapper.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.action.cdc.kafka; - -import org.apache.flink.api.common.serialization.DeserializationSchema; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema; -import org.apache.flink.util.Collector; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * A class that wraps a {@link DeserializationSchema} as the value deserializer for a {@link - * ConsumerRecord}. - * - * @param the return type of the deserialization. - */ -class KafkaValueOnlyDeserializationSchemaWrapper implements KafkaRecordDeserializationSchema { - private static final long serialVersionUID = 1L; - private final DeserializationSchema deserializationSchema; - private static final Logger LOG = - LoggerFactory.getLogger(KafkaValueOnlyDeserializationSchemaWrapper.class); - - KafkaValueOnlyDeserializationSchemaWrapper(DeserializationSchema deserializationSchema) { - this.deserializationSchema = deserializationSchema; - } - - @Override - public void open(DeserializationSchema.InitializationContext context) throws Exception { - deserializationSchema.open(context); - } - - @Override - public void deserialize(ConsumerRecord message, Collector out) - throws IOException { - if (message.value() != null) { - deserializationSchema.deserialize(message.value(), out); - } else { - // see - // https://debezium.io/documentation/reference/2.5/connectors/mysql.html#mysql-tombstone-events - LOG.info( - "Found null message value:\n{}\nThis message will be ignored. It might be produced by tombstone-event, " - + "please check your Debezium and Kafka configuration.", - message); - } - } - - @Override - public TypeInformation getProducedType() { - return deserializationSchema.getProducedType(); - } -} diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarKeyValueDeserializationSchema.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarKeyValueDeserializationSchema.java deleted file mode 100644 index 55ad8a9fdcf3d..0000000000000 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/pulsar/PulsarKeyValueDeserializationSchema.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.action.cdc.pulsar; - -import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils; -import org.apache.paimon.utils.Pair; - -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; -import org.apache.flink.util.Collector; -import org.apache.flink.util.Preconditions; -import org.apache.pulsar.client.api.Message; - -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; - -/** Deserialization for Pulsar key and value. */ -public class PulsarKeyValueDeserializationSchema implements PulsarDeserializationSchema { - - private String charset; - private String format; - - public PulsarKeyValueDeserializationSchema(String format) { - this(StandardCharsets.UTF_8.name(), format); - } - - public PulsarKeyValueDeserializationSchema(String charset, String format) { - this.charset = Preconditions.checkNotNull(charset); - this.format = format; - } - - @Override - public void deserialize(Message message, Collector collector) throws Exception { - String value = new String(message.getValue(), Charset.forName(charset)); - collector.collect( - DebeziumSchemaUtils.rewriteValue(Pair.of(message.getKey(), value), format)); - } - - @Override - public TypeInformation getProducedType() { - return TypeInformation.of(String.class); - } -} diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java index 4726a4206623c..72225e7df2a87 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaActionITCaseBase.java @@ -328,8 +328,9 @@ private boolean isRecordLine(String line) { } } - private void send(String topic, String record, boolean wait) { - Future sendFuture = kafkaProducer.send(new ProducerRecord<>(topic, record)); + private void send(String topic, String key, String record, boolean wait) { + Future sendFuture = + kafkaProducer.send(new ProducerRecord<>(topic, key, record)); if (wait) { try { sendFuture.get(); @@ -339,27 +340,29 @@ private void send(String topic, String record, boolean wait) { } } - void writeRecordsToKafka(String topic, Map data) throws Exception { - Properties producerProperties = getStandardProps(); - producerProperties.setProperty("retries", "0"); - producerProperties.put( - "key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - producerProperties.put( - "value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - KafkaProducer kafkaProducer = new KafkaProducer(producerProperties); - for (Map.Entry entry : data.entrySet()) { + private void send(String topic, String record, boolean wait) { + Future sendFuture = kafkaProducer.send(new ProducerRecord<>(topic, record)); + if (wait) { try { - JsonNode keyNode = objectMapper.readTree(entry.getKey()); - JsonNode valueNode = objectMapper.readTree(entry.getValue()); - if (!StringUtils.isEmpty(entry.getValue())) { - kafkaProducer.send( - new ProducerRecord<>(topic, entry.getKey(), entry.getValue())); - } - } catch (Exception e) { - // ignore + sendFuture.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); } } - kafkaProducer.close(); + } + + void writeRecordsToKafka(String topic, boolean wait, String resourceDirFormat) + throws Exception { + URL url = + KafkaCanalSyncTableActionITCase.class + .getClassLoader() + .getResource(String.format(resourceDirFormat)); + List lines = Files.readAllLines(Paths.get(url.toURI())); + lines.stream() + .map(line -> line.split(";")) + .filter(keyValues -> (keyValues.length > 1)) + .filter(keyValues -> isRecordLine(keyValues[0]) && isRecordLine(keyValues[1])) + .forEach(keyValues -> this.send(topic, keyValues[0], keyValues[1], wait)); } /** Kafka container extension for junit5. */ diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java index 841a380ef0c5f..b6701366df4d2 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableActionITCase.java @@ -33,7 +33,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -574,17 +573,9 @@ public void testRecordWithPrimaryKeys(String format) throws Exception { String topic = "no_schema_include_with_primary_keys"; createTestTopic(topic, 1, 1); - List lines = - readLines("kafka/debezium/table/schema/primarykeys/debezium-data-1.txt"); - Map keyValues = new HashMap<>(); - for (String line : lines) { - String[] splitLines = line.split(";"); - if (splitLines.length > 1) { - keyValues.put(splitLines[0], splitLines[1]); - } - } try { - writeRecordsToKafka(topic, keyValues); + writeRecordsToKafka( + topic, false, "kafka/debezium/table/schema/primarykeys/debezium-data-1.txt"); } catch (Exception e) { throw new Exception("Failed to write debezium data to Kafka.", e); } @@ -617,19 +608,11 @@ public void testRecordWithPrimaryKeys(String format) throws Exception { public void testSchemaIncludeRecordWithPrimaryKeys(String format) throws Exception { String topic = "schema_include_with_primary_keys"; createTestTopic(topic, 1, 1); - - List lines = - readLines( - "kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt"); - Map keyValues = new HashMap<>(); - for (String line : lines) { - String[] splitLines = line.split(";"); - if (splitLines.length > 1) { - keyValues.put(splitLines[0], splitLines[1]); - } - } try { - writeRecordsToKafka(topic, keyValues); + writeRecordsToKafka( + topic, + false, + "kafka/debezium/table/schema/primarykeys/debezium-data-with-schema-1.txt"); } catch (Exception e) { throw new Exception("Failed to write debezium data to Kafka.", e); }