Skip to content

Commit

Permalink
[FLINK-35912][cdc-connector] SqlServer CDC doesn't chunk UUID-typed c…
Browse files Browse the repository at this point in the history
…olumns correctly (apache#3497)

* resolve conficts

* polish code to trigger ci

---------

Co-authored-by: Kael <kael@fts.dev>
Co-authored-by: gongzhongqiang <gongzhongqiang@gigacloudtech.com>
  • Loading branch information
3 people authored and qiaozongmi committed Sep 23, 2024
1 parent e393297 commit 4460b98
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,12 @@ protected Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) {
}

/** ChunkEnd less than or equal to max. */
protected boolean isChunkEndLeMax(Object chunkEnd, Object max) {
protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) {
return ObjectUtils.compare(chunkEnd, max) <= 0;
}

/** ChunkEnd greater than or equal to max. */
protected boolean isChunkEndGeMax(Object chunkEnd, Object max) {
protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) {
return ObjectUtils.compare(chunkEnd, max) >= 0;
}

Expand Down Expand Up @@ -368,7 +368,7 @@ private List<ChunkRange> splitUnevenlySizedChunks(
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize);
int count = 0;
while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max)) {
while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max, splitColumn)) {
// we start from [null, min + chunk_size) and avoid [null, min)
splits.add(ChunkRange.of(chunkStart, chunkEnd));
// may sleep a while to avoid DDOS on PostgreSQL server
Expand Down Expand Up @@ -397,7 +397,7 @@ private Object nextChunkEnd(
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
}
if (isChunkEndGeMax(chunkEnd, max)) {
if (isChunkEndGeMax(chunkEnd, max, splitColumn)) {
return null;
} else {
return chunkEnd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ protected boolean isEvenlySplitColumn(Column splitColumn) {

/** ChunkEnd less than or equal to max. */
@Override
protected boolean isChunkEndLeMax(Object chunkEnd, Object max) {
protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) {
boolean chunkEndMaxCompare;
if (chunkEnd instanceof ROWID && max instanceof ROWID) {
chunkEndMaxCompare =
Expand All @@ -116,7 +116,7 @@ protected boolean isChunkEndLeMax(Object chunkEnd, Object max) {

/** ChunkEnd greater than or equal to max. */
@Override
protected boolean isChunkEndGeMax(Object chunkEnd, Object max) {
protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) {
boolean chunkEndMaxCompare;
if (chunkEnd instanceof ROWID && max instanceof ROWID) {
chunkEndMaxCompare =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,13 @@ protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
throws SQLException {
return SqlServerUtils.queryApproximateRowCnt(jdbc, tableId);
}

protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) {
return SqlServerUtils.compare(chunkEnd, max, splitColumn) <= 0;
}

/** ChunkEnd greater than or equal to max. */
protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) {
return SqlServerUtils.compare(chunkEnd, max, splitColumn) >= 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
/** Utilities for converting from SqlServer types to Flink types. */
public class SqlServerTypeUtils {

/** Microsoft SQL type GUID's type name. */
static final String UNIQUEIDENTIFIRER = "uniqueidentifier";

/** Returns a corresponding Flink data type from a debezium {@link Column}. */
public static DataType fromDbzColumn(Column column) {
DataType dataType = convertFromColumn(column);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.sqlserver.source.utils;

import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
import org.apache.flink.cdc.connectors.sqlserver.source.offset.LsnOffset;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.RowType;
Expand All @@ -40,14 +41,17 @@

import javax.annotation.Nullable;

import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.flink.table.api.DataTypes.FIELD;
Expand Down Expand Up @@ -297,8 +301,7 @@ private static String buildSplitQuery(
return buildSelectWithRowLimits(
tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty());
} else {
final String orderBy =
pkRowType.getFieldNames().stream().collect(Collectors.joining(", "));
final String orderBy = String.join(", ", pkRowType.getFieldNames());
return buildSelectWithBoundaryRowLimits(
tableId,
limitSize,
Expand All @@ -322,7 +325,7 @@ private static String getMaxPrimaryKeyColumnsProjection(RowType pkRowType) {
StringBuilder sql = new StringBuilder();
for (Iterator<String> fieldNamesIt = pkRowType.getFieldNames().iterator();
fieldNamesIt.hasNext(); ) {
sql.append("MAX(" + fieldNamesIt.next() + ")");
sql.append("MAX(").append(fieldNamesIt.next()).append(")");
if (fieldNamesIt.hasNext()) {
sql.append(" , ");
}
Expand All @@ -342,12 +345,8 @@ private static String buildSelectWithRowLimits(
}
sql.append(projection).append(" FROM ");
sql.append(quoteSchemaAndTable(tableId));
if (condition.isPresent()) {
sql.append(" WHERE ").append(condition.get());
}
if (orderBy.isPresent()) {
sql.append(" ORDER BY ").append(orderBy.get());
}
condition.ifPresent(s -> sql.append(" WHERE ").append(s));
orderBy.ifPresent(s -> sql.append(" ORDER BY ").append(s));
return sql.toString();
}

Expand Down Expand Up @@ -396,11 +395,54 @@ private static String buildSelectWithBoundaryRowLimits(
sql.append(projection);
sql.append(" FROM ");
sql.append(quoteSchemaAndTable(tableId));
if (condition.isPresent()) {
sql.append(" WHERE ").append(condition.get());
}
condition.ifPresent(s -> sql.append(" WHERE ").append(s));
sql.append(" ORDER BY ").append(orderBy);
sql.append(") T");
return sql.toString();
}

public static int compare(Object obj1, Object obj2, Column splitColumn) {
if (splitColumn.typeName().equals(SqlServerTypeUtils.UNIQUEIDENTIFIRER)) {
return new SQLServerUUIDComparator()
.compare(UUID.fromString(obj1.toString()), UUID.fromString(obj2.toString()));
}
return ObjectUtils.compare(obj1, obj2);
}

/**
* Comparator for SQL Server UUIDs. SQL Server compares UUIDs in a different order than Java.
* Reference code: <a
* href="https://github.com/dotnet/runtime/blob/5535e31a712343a63f5d7d796cd874e563e5ac14/src/libraries/System.Data.Common/src/System/Data/SQLTypes/SQLGuid.cs#L113">SQLGuid.cs::CompareTo</a>
* Reference doc: <a
* href="https://learn.microsoft.com/uk-ua/sql/connect/ado-net/sql/compare-guid-uniqueidentifier-values?view=sql-server-ver16">Comparing
* GUID and uniqueidentifier values</a>
*/
static class SQLServerUUIDComparator implements Comparator<UUID> {

private static final int SIZE_OF_GUID = 16;
private static final byte[] GUID_ORDER = {
10, 11, 12, 13, 14, 15, 8, 9, 6, 7, 4, 5, 0, 1, 2, 3
};

public int compare(UUID uuid1, UUID uuid2) {
byte[] bytes1 = uuidToBytes(uuid1);
byte[] bytes2 = uuidToBytes(uuid2);

for (int i = 0; i < SIZE_OF_GUID; i++) {
byte b1 = bytes1[GUID_ORDER[i]];
byte b2 = bytes2[GUID_ORDER[i]];
if (b1 != b2) {
return (b1 & 0xFF) - (b2 & 0xFF); // Unsigned byte comparison
}
}
return 0;
}

private byte[] uuidToBytes(UUID uuid) {
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
bb.putLong(uuid.getMostSignificantBits());
bb.putLong(uuid.getLeastSignificantBits());
return bb.array();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.sqlserver.source.utils;

import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;

import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertEquals;

/** Unit test for {@link SqlServerUtils.SQLServerUUIDComparator}. * */
public class SQLServerUUIDComparatorTest {

@Test
public void testComparator() {
SqlServerUtils.SQLServerUUIDComparator comparator =
new SqlServerUtils.SQLServerUUIDComparator();
// Create an ArrayList and fill it with Guid values.
List<UUID> guidList = new ArrayList<>();
guidList.add(UUID.fromString("3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE"));
guidList.add(UUID.fromString("2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE"));
guidList.add(UUID.fromString("1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE"));

// Sort the Guids.
guidList.sort(ObjectUtils::compare);

assertEquals(
guidList.get(0).toString().toUpperCase(), "1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE");
assertEquals(
guidList.get(1).toString().toUpperCase(), "2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE");
assertEquals(
guidList.get(2).toString().toUpperCase(), "3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE");

// Create an ArrayList of SqlGuids.
List<UUID> sqlGuidList = new ArrayList<>();
sqlGuidList.add(UUID.fromString("3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE"));
sqlGuidList.add(UUID.fromString("2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE"));
sqlGuidList.add(UUID.fromString("1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE"));

// Sort the SqlGuids. The unsorted SqlGuids are in the same order
// as the unsorted Guid values.
sqlGuidList.sort(comparator);

// Display the sorted SqlGuids. The sorted SqlGuid values are ordered
// differently than the Guid values.
assertEquals(
sqlGuidList.get(0).toString().toUpperCase(),
"2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE");
assertEquals(
sqlGuidList.get(1).toString().toUpperCase(),
"3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE");
assertEquals(
sqlGuidList.get(2).toString().toUpperCase(),
"1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE");
}
}

0 comments on commit 4460b98

Please sign in to comment.