Skip to content

Commit

Permalink
Add debezium-json format apache#2227
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Nov 15, 2023
1 parent 30525c1 commit edfb615
Show file tree
Hide file tree
Showing 32 changed files with 1,489 additions and 224 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.flink.action.cdc.format.RecordParser;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.utils.Pair;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -92,6 +93,6 @@ public SchemaRetrievalException(String message) {
/** Wrap the consumer for different message queues. */
public interface ConsumerWrapper extends AutoCloseable {

List<String> getRecords(String topic, int pollTimeOutMills);
List<Pair<String, String>> getRecords(String topic, int pollTimeOutMills);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Pair;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Source;
Expand Down Expand Up @@ -137,7 +138,7 @@ public MessageQueueSyncTableActionBase withTypeMapping(TypeMapping typeMapping)
return this;
}

protected abstract Source<String, ?, ?> buildSource();
protected abstract Source<Pair, ?, ?> buildSource();

protected abstract String topic();

Expand All @@ -149,7 +150,7 @@ public MessageQueueSyncTableActionBase withTypeMapping(TypeMapping typeMapping)

@Override
public void build() throws Exception {
Source<String, ?, ?> source = buildSource();
Source<Pair, ?, ?> source = buildSource();

catalog.createDatabase(database, true);
boolean caseSensitive = catalog.caseSensitive();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.canal.CanalRecordParser;
import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumRecordParser;
import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumJsonRecordParser;
import org.apache.paimon.flink.action.cdc.format.maxwell.MaxwellRecordParser;
import org.apache.paimon.flink.action.cdc.format.ogg.OggRecordParser;

Expand All @@ -38,7 +38,7 @@ public enum DataFormat {
CANAL_JSON(CanalRecordParser::new),
OGG_JSON(OggRecordParser::new),
MAXWELL_JSON(MaxwellRecordParser::new),
DEBEZIUM_JSON(DebeziumRecordParser::new);
DEBEZIUM_JSON(DebeziumJsonRecordParser::new);
// Add more data formats here if needed

private final RecordParserFactory parser;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
Expand Down Expand Up @@ -68,10 +69,13 @@
* Subclasses are expected to provide specific implementations for extracting records, validating
* message formats, and other format-specific operations.
*/
public abstract class RecordParser implements FlatMapFunction<String, RichCdcMultiplexRecord> {
public abstract class RecordParser implements FlatMapFunction<Pair, RichCdcMultiplexRecord> {

protected static final String FIELD_TABLE = "table";
protected static final String FIELD_DATABASE = "database";

protected static final String FIELD_RECORD_KEY = "key";
protected static final String FIELD_RECORD_VALUE = "value";
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
protected final boolean caseSensitive;
protected final TypeMapping typeMapping;
Expand All @@ -89,13 +93,18 @@ public RecordParser(
this.computedColumns = computedColumns;
}

@Nullable
public Schema buildSchema(Pair<String, String> record) {
this.parseKeyIfNeed(record.getKey());
return this.buildSchema(record.getValue());
}

@Nullable
public Schema buildSchema(String record) {
this.parseRootJson(record);
if (BooleanUtils.isTrue(this.isDDL())) {
return null;
}

tableName = extractStringFromRootJson(FIELD_TABLE);
this.validateFormat();
this.extractPrimaryKeys();
Expand Down Expand Up @@ -155,10 +164,9 @@ protected LinkedHashMap<String, DataType> setPaimonFieldType() {
}

@Override
public void flatMap(String value, Collector<RichCdcMultiplexRecord> out) throws Exception {
root = OBJECT_MAPPER.readValue(value, JsonNode.class);
public void flatMap(Pair record, Collector<RichCdcMultiplexRecord> out) throws Exception {
root = OBJECT_MAPPER.readValue((String) record.getValue(), JsonNode.class);
this.validateFormat();

databaseName = extractStringFromRootJson(FIELD_DATABASE);
tableName = extractStringFromRootJson(FIELD_TABLE);

Expand Down Expand Up @@ -231,7 +239,9 @@ protected RichCdcMultiplexRecord createRecord(
new CdcRecord(rowKind, data));
}

protected final void parseRootJson(String record) {
protected void parseKeyIfNeed(String key) {}

protected void parseRootJson(String record) {
try {
root = OBJECT_MAPPER.readValue(record, JsonNode.class);
} catch (JsonProcessingException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.format.debezium;

import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.RecordParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.StringUtils;

import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;

import org.apache.commons.lang3.BooleanUtils;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

import static org.apache.paimon.utils.JsonSerdeUtil.isNull;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
* The {@code DebeziumRecordParser} class extends the abstract {@link RecordParser} and is designed
* to parse records from Debezium's JSON change data capture (CDC) format. Debezium is a CDC
* solution for MySQL databases that captures row-level changes to database tables and outputs them
* in JSON format. This parser extracts relevant information from the debezium-json format and
* converts it into a list of {@link RichCdcMultiplexRecord} objects.
*
* <p>The class supports various database operations such as INSERT, UPDATE, and DELETE, and creates
* corresponding {@link RichCdcMultiplexRecord} objects to represent these changes.
*
* <p>Validation is performed to ensure that the JSON records contain all necessary fields, and the
* class also supports schema extraction for the Kafka topic.
*/
public class DebeziumJsonRecordParser extends RecordParser {

private static final String RECORD_PAYLOAD = "payload";
private static final String RECORD_SCHEMA = "schema";
private static final String RECORD_SOURCE = "source";
private static final String RECORD_SOURCE_DB = "db";
private static final String FIELD_BEFORE = "before";
private static final String OP_FIELD = "op";
private static final String OP_READ = "r"; // snapshot read
private static final String OP_CREATE = "c"; // insert
private static final String OP_UPDATE = "u"; // update
private static final String OP_DELETE = "d"; // delete

protected JsonNode keyPayload;
protected JsonNode source;

public DebeziumJsonRecordParser(
boolean caseSensitive, TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
super(caseSensitive, typeMapping, computedColumns);
}

@Override
protected Boolean isDDL() {
return root.has("tableChanges");
}

@Override
protected List<RichCdcMultiplexRecord> extractRecords() {
List<RichCdcMultiplexRecord> records = new ArrayList<>();
// skip ddl
if (BooleanUtils.isTrue(this.isDDL())) {
return Collections.emptyList();
}
String operation = extractStringFromRootJson(OP_FIELD);
switch (operation) {
case OP_CREATE:
case OP_READ:
processRecord(root.get(dataField()), RowKind.INSERT, records);
break;
case OP_UPDATE:
processRecord(root.get(FIELD_BEFORE), RowKind.DELETE, records);
processRecord(root.get(dataField()), RowKind.INSERT, records);
break;
case OP_DELETE:
processRecord(root.get(FIELD_BEFORE), RowKind.DELETE, records);
break;
default:
throw new UnsupportedOperationException("Unknown record operation: " + operation);
}
return records;
}

@Override
protected void validateFormat() {
String errorMessageTemplate =
"Didn't find '%s' node in json. Please make sure your topic's format is correct.";
checkArgument(!isNull(keyPayload), errorMessageTemplate, FIELD_RECORD_KEY);
checkArgument(!isNull(source), errorMessageTemplate, RECORD_SOURCE);
checkArgument(!isNull(source.get(FIELD_TABLE)), errorMessageTemplate, FIELD_TABLE);
checkArgument(
!isNull(source.get(RECORD_SOURCE_DB)), errorMessageTemplate, RECORD_SOURCE_DB);
checkArgument(!isNull(root.get(OP_FIELD)), errorMessageTemplate, OP_FIELD);
if (root.get(OP_FIELD).asText().equals(OP_DELETE)) {
checkArgument(!isNull(root.get(FIELD_BEFORE)), errorMessageTemplate, FIELD_BEFORE);
} else {
checkArgument(!isNull(root.get(dataField())), errorMessageTemplate, dataField());
}
}

@Override
protected String primaryField() {
// No-op
return null;
}

@Override
protected String dataField() {
return "after";
}

@Override
protected void extractPrimaryKeys() {
primaryKeys = Lists.newArrayList(keyPayload.fieldNames());
}

@Override
public void flatMap(Pair record, Collector<RichCdcMultiplexRecord> out) throws Exception {
// Extract debezium record key payload field
if (Objects.nonNull(record.getKey())) {
this.extractKeyPayload(
OBJECT_MAPPER.readValue((String) record.getKey(), JsonNode.class));
}
// Extract debezium record value payload field
if (Objects.nonNull(record.getRight())) {
this.extractValuePayload(
OBJECT_MAPPER.readValue((String) record.getValue(), JsonNode.class));
}
this.validateFormat();
databaseName = extractStringFromRootJson(FIELD_DATABASE);
tableName = extractStringFromRootJson(FIELD_TABLE);
extractRecords().forEach(out::collect);
}

@Override
protected void parseKeyIfNeed(String key) {
if (StringUtils.isBlank(key)) {
return;
}
try {
// For extract primary key
JsonNode recordData = OBJECT_MAPPER.readValue(key, JsonNode.class);
extractKeyPayload(recordData);
} catch (JsonProcessingException e) {
throw new RuntimeException("Error processing record key JSON: " + key, e);
}
}

private void extractKeyPayload(JsonNode recordData) {
if (includeSchema(recordData)) {
keyPayload = recordData.get(RECORD_PAYLOAD);
} else {
keyPayload = recordData;
}
}

@Override
protected void parseRootJson(String record) {
try {
JsonNode recordData = OBJECT_MAPPER.readValue(record, JsonNode.class);
extractValuePayload(recordData);
} catch (JsonProcessingException e) {
throw new RuntimeException("Error processing JSON: " + record, e);
}
}

private void extractValuePayload(JsonNode recordData) {
// for extract record value
if (includeSchema(recordData)) {
root = recordData.get(RECORD_PAYLOAD);
} else {
root = recordData;
}
// For extract record metadata
source = root.get(RECORD_SOURCE);
}

@Override
protected String extractStringFromRootJson(String key) {
if (key.equals(FIELD_TABLE)) {
tableName = source.get(FIELD_TABLE).asText();
return tableName;
} else if (key.equals(FIELD_DATABASE)) {
this.databaseName = source.get(RECORD_SOURCE_DB).asText();
return databaseName;
}
return root.get(key) != null ? root.get(key).asText() : null;
}

private boolean includeSchema(JsonNode record) {
return record.size() == 2 && record.has(RECORD_SCHEMA) && record.has(RECORD_PAYLOAD);
}
}
Loading

0 comments on commit edfb615

Please sign in to comment.