Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jun 25, 2024
1 parent cf92903 commit e90d420
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 309 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,14 @@
package org.apache.paimon.flink.action.cdc.format.debezium;

import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.flink.action.cdc.mysql.MySqlTypeUtils;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

import io.debezium.data.Bits;
import io.debezium.data.geometry.Geometry;
Expand All @@ -52,67 +46,16 @@
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Base64;
import java.util.List;
import java.util.Map;

import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_STRING;
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumRecordParser.FIELD_PAYLOAD;
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumRecordParser.FIELD_PRIMARY;
import static org.apache.paimon.flink.action.cdc.format.debezium.DebeziumRecordParser.FIELD_SCHEMA;

/**
* Utils to handle 'schema' field in debezium Json. TODO: The methods have many duplicate codes with
* MySqlRecordParser. Need refactor.
*/
public class DebeziumSchemaUtils {

/** Rewrite value. */
public static String rewriteValue(Pair<String, String> message, String format) {
DataFormat dataFormat = DataFormat.fromConfigString(format);
String value = message.getValue();
switch (dataFormat) {
case DEBEZIUM_JSON:
if (StringUtils.isBlank(message.getKey())) {
return value;
} else {
String key = message.getKey();
Pair<String, String> keyValue = Pair.of(key, value);
String newValue = extractPrimaryKeys(keyValue);
return newValue;
}
default:
return value;
}
}

/** Append primary keys to value. */
public static String extractPrimaryKeys(Pair<String, String> record) {
String key = record.getKey();
if (StringUtils.isBlank(key)) {
return record.getValue();
}
try {
List<String> primaryKeys = Lists.newArrayList();
JsonNode keyNode = JsonSerdeUtil.fromJson(key, JsonNode.class);
JsonNode payload;
if (keyNode.has(FIELD_SCHEMA)) {
payload = keyNode.get(FIELD_PAYLOAD);
} else {
payload = keyNode;
}
payload.fieldNames().forEachRemaining(primaryKeys::add);
ObjectNode valueNode = JsonSerdeUtil.fromJson(record.getValue(), ObjectNode.class);

// append primary keys
JsonSerdeUtil.setNode(valueNode, FIELD_PRIMARY, primaryKeys);
return JsonSerdeUtil.writeValueAsString(valueNode);
} catch (JsonProcessingException e) {
throw new RuntimeException(
"An error occurred when automatically attaching the debezium primary keys to Value",
e);
}
}

/** Transform raw string value according to schema. */
public static String transformRawValue(
@Nullable String rawValue,
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,9 @@ private boolean isRecordLine(String line) {
}
}

private void send(String topic, String record, boolean wait) {
Future<RecordMetadata> sendFuture = kafkaProducer.send(new ProducerRecord<>(topic, record));
private void send(String topic, String key, String record, boolean wait) {
Future<RecordMetadata> sendFuture =
kafkaProducer.send(new ProducerRecord<>(topic, key, record));
if (wait) {
try {
sendFuture.get();
Expand All @@ -339,27 +340,29 @@ private void send(String topic, String record, boolean wait) {
}
}

void writeRecordsToKafka(String topic, Map<String, String> data) throws Exception {
Properties producerProperties = getStandardProps();
producerProperties.setProperty("retries", "0");
producerProperties.put(
"key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProperties.put(
"value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer kafkaProducer = new KafkaProducer(producerProperties);
for (Map.Entry<String, String> entry : data.entrySet()) {
private void send(String topic, String record, boolean wait) {
Future<RecordMetadata> sendFuture = kafkaProducer.send(new ProducerRecord<>(topic, record));
if (wait) {
try {
JsonNode keyNode = objectMapper.readTree(entry.getKey());
JsonNode valueNode = objectMapper.readTree(entry.getValue());
if (!StringUtils.isEmpty(entry.getValue())) {
kafkaProducer.send(
new ProducerRecord<>(topic, entry.getKey(), entry.getValue()));
}
} catch (Exception e) {
// ignore
sendFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
kafkaProducer.close();
}

void writeRecordsToKafka(String topic, boolean wait, String resourceDirFormat)
throws Exception {
URL url =
KafkaCanalSyncTableActionITCase.class
.getClassLoader()
.getResource(String.format(resourceDirFormat));
List<String> lines = Files.readAllLines(Paths.get(url.toURI()));
lines.stream()
.map(line -> line.split(";"))
.filter(keyValues -> (keyValues.length > 1))
.filter(keyValues -> isRecordLine(keyValues[0]) && isRecordLine(keyValues[1]))
.forEach(keyValues -> this.send(topic, keyValues[0], keyValues[1], wait));
}

/** Kafka container extension for junit5. */
Expand Down
Loading

0 comments on commit e90d420

Please sign in to comment.