Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Nov 28, 2023
1 parent dbd1e07 commit b871983
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.paimon.flink.action.cdc.format.DebeziumEvent;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.Preconditions;
Expand All @@ -34,9 +33,6 @@
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.history.TableChanges;
import io.debezium.time.Conversions;
import io.debezium.time.Date;
import io.debezium.time.MicroTime;
Expand Down Expand Up @@ -66,20 +62,14 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnCaseConvertAndDuplicateCheck;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.columnDuplicateErrMsg;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.listCaseConvert;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.mapKeyCaseConvert;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.recordKeyDuplicateErrMsg;
import static org.apache.paimon.flink.action.cdc.TypeMapping.TypeMappingMode.TO_NULLABLE;
import static org.apache.paimon.utils.JsonSerdeUtil.isNull;

/**
Expand All @@ -96,6 +86,7 @@ public class SqlServerRecordParser implements FlatMapFunction<String, RichCdcMul
private final TypeMapping typeMapping;
private DebeziumEvent root;
private String currentTable;
private String schemaName;
private String databaseName;
private final Set<String> nonPkTables = new HashSet<>();
private final CdcMetadataConverter[] metadataConverters;
Expand Down Expand Up @@ -138,61 +129,18 @@ public SqlServerRecordParser(
public void flatMap(String rawEvent, Collector<RichCdcMultiplexRecord> out) throws Exception {
this.root = objectMapper.readValue(rawEvent, DebeziumEvent.class);
this.currentTable = root.payload().source().table();
this.schemaName = root.payload().source().schema();
this.databaseName = root.payload().source().db();
if (nonPkTables.contains(currentTable)) {
return;
}
if (root.payload().isSchemaChange()) {
extractSchemaChange().forEach(out::collect);
return;
throw new UnsupportedOperationException(
"Temporarily not supporting schema change events");
}
convertRecords().forEach(out::collect);
}

private List<RichCdcMultiplexRecord> extractSchemaChange() {
DebeziumEvent.Payload payload = root.payload();
if (!payload.hasHistoryRecord()) {
return Collections.emptyList();
}
TableChanges.TableChange tableChange = null;
try {
Iterator<TableChanges.TableChange> tableChanges = payload.getTableChanges();
if (!tableChanges.hasNext()) {
LOG.error(
"Invalid historyRecord, because tableChanges should contain exactly 1 item.\n"
+ payload.historyRecord());
return Collections.emptyList();
}
while (tableChanges.hasNext()) {
tableChange = tableChanges.next();
}
if (TableChanges.TableChangeType.CREATE == tableChange.getType()
&& tableChange.getTable().primaryKeyColumnNames().isEmpty()) {
LOG.error(
"Didn't find primary keys from SqlServer DDL for table '{}'. "
+ "This table won't be synchronized.",
currentTable);
nonPkTables.add(currentTable);
return Collections.emptyList();
}
} catch (Exception e) {
LOG.error("Failed to parse history record for schema changes", e);
return Collections.emptyList();
}

Table table = tableChange.getTable();
LinkedHashMap<String, DataType> fieldTypes = convertFieldTypes(table);
List<String> primaryKeys = listCaseConvert(table.primaryKeyColumnNames(), caseSensitive);

return Collections.singletonList(
new RichCdcMultiplexRecord(
databaseName,
currentTable,
fieldTypes,
primaryKeys,
CdcRecord.emptyRecord()));
}

private List<RichCdcMultiplexRecord> convertRecords() {
List<RichCdcMultiplexRecord> records = Lists.newArrayList();
Map<String, String> before = extractRow(root.payload().before());
Expand All @@ -208,28 +156,6 @@ private List<RichCdcMultiplexRecord> convertRecords() {
return records;
}

private LinkedHashMap<String, DataType> convertFieldTypes(Table table) {
List<Column> columns = table.columns();
LinkedHashMap<String, DataType> fieldTypes = new LinkedHashMap<>(columns.size());
Set<String> existedFields = new HashSet<>();
Function<String, String> columnDuplicateErrMsg =
columnDuplicateErrMsg(table.id().toString());
for (Column column : columns) {
String columnName =
columnCaseConvertAndDuplicateCheck(
column.name(), existedFields, caseSensitive, columnDuplicateErrMsg);
DataType dataType =
SqlServerTypeUtils.toPaimonDataType(
column.typeExpression(),
column.length(),
column.scale().orElse(null),
typeMapping);
dataType = dataType.copy(typeMapping.containsMode(TO_NULLABLE) || column.isOptional());
fieldTypes.put(columnName, dataType);
}
return fieldTypes;
}

private Map<String, String> extractRow(JsonNode record) {
if (record == null) {
return new HashMap<>();
Expand Down Expand Up @@ -348,7 +274,7 @@ private boolean isSchemaBytes(String schemaType) {

protected RichCdcMultiplexRecord createRecord(RowKind rowKind, Map<String, String> data) {
return new RichCdcMultiplexRecord(
databaseName,
databaseName + "_" + schemaName,
currentTable,
new LinkedHashMap<>(0),
Collections.emptyList(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@

/** Copy from org.testcontainers.utility.LicenseAcceptance, modify license location. */
public final class LicenseAcceptanceUtils {
private static final String ACCEPTANCE_FILE_NAME = "META-INF/container-license-acceptance.txt";
private static final String ACCEPTANCE_FILE_NAME =
"META-INF/licenses/container-license-acceptance.txt";

public static void assertLicenseAccepted(String imageName) {
try {
Expand All @@ -45,9 +46,9 @@ public static void assertLicenseAccepted(String imageName) {
"The image "
+ imageName
+ " requires you to accept a license agreement. Please place a file at the root of the classpath named "
+ "META-INF/services/container-license-acceptance.txt"
+ "META-INF/licenses/container-license-acceptance.txt"
+ ", e.g. at src/test/resources/"
+ "META-INF/container-license-acceptance.txt"
+ "META-INF/licenses/container-license-acceptance.txt"
+ ". This file should contain the line:\n "
+ imageName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,17 @@ public void testActionRunResult() throws Exception {
"shard_database_dbo_taa",
"shard_database_dbo_s2");

if (mode == COMBINED) {
try (Statement statement = getStatement()) {
// ensure the job steps into incremental phase
statement.executeUpdate("USE shard_database");
statement.executeUpdate("INSERT INTO t2 VALUES (1, 'A')");
waitForResult(
Collections.singletonList("+I[1, A]"),
getFileStoreTable("shard_database_dbo_t2"),
RowType.of(
new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(100)},
new String[] {"k", "name"}),
Collections.singletonList("k"));
}
try (Statement statement = getStatement()) {
// ensure the job steps into incremental phase
statement.executeUpdate("USE shard_database");
statement.executeUpdate("INSERT INTO t2 VALUES (1, 'A')");
}
waitForResult(
Collections.singletonList("+I[1, A]"),
getFileStoreTable("shard_database_dbo_t2"),
RowType.of(
new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(100)},
new String[] {"k", "name"}),
Collections.singletonList("k"));
}
}

0 comments on commit b871983

Please sign in to comment.