Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35912] SqlServer CDC doesn't chunk UUID-typed columns correctly #3497

Merged
merged 2 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,12 @@ protected Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) {
}

/** ChunkEnd less than or equal to max. */
protected boolean isChunkEndLeMax(Object chunkEnd, Object max) {
protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) {
return ObjectUtils.compare(chunkEnd, max) <= 0;
}

/** ChunkEnd greater than or equal to max. */
protected boolean isChunkEndGeMax(Object chunkEnd, Object max) {
protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) {
return ObjectUtils.compare(chunkEnd, max) >= 0;
}

Expand Down Expand Up @@ -368,7 +368,7 @@ private List<ChunkRange> splitUnevenlySizedChunks(
Object chunkStart = null;
Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize);
int count = 0;
while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max)) {
while (chunkEnd != null && isChunkEndLeMax(chunkEnd, max, splitColumn)) {
// we start from [null, min + chunk_size) and avoid [null, min)
splits.add(ChunkRange.of(chunkStart, chunkEnd));
// may sleep a while to avoid DDOS on PostgreSQL server
Expand Down Expand Up @@ -397,7 +397,7 @@ private Object nextChunkEnd(
// should query the next one larger than chunkEnd
chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd);
}
if (isChunkEndGeMax(chunkEnd, max)) {
if (isChunkEndGeMax(chunkEnd, max, splitColumn)) {
return null;
} else {
return chunkEnd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ protected boolean isEvenlySplitColumn(Column splitColumn) {

/** ChunkEnd less than or equal to max. */
@Override
protected boolean isChunkEndLeMax(Object chunkEnd, Object max) {
protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) {
boolean chunkEndMaxCompare;
if (chunkEnd instanceof ROWID && max instanceof ROWID) {
chunkEndMaxCompare =
Expand All @@ -116,7 +116,7 @@ protected boolean isChunkEndLeMax(Object chunkEnd, Object max) {

/** ChunkEnd greater than or equal to max. */
@Override
protected boolean isChunkEndGeMax(Object chunkEnd, Object max) {
protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) {
boolean chunkEndMaxCompare;
if (chunkEnd instanceof ROWID && max instanceof ROWID) {
chunkEndMaxCompare =
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we can add some test cases to verify UUID comparison logic. Test cases from Microsoft docs would be ideal.

Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,13 @@ protected Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
throws SQLException {
return SqlServerUtils.queryApproximateRowCnt(jdbc, tableId);
}

protected boolean isChunkEndLeMax(Object chunkEnd, Object max, Column splitColumn) {
return SqlServerUtils.compare(chunkEnd, max, splitColumn) <= 0;
}

/** ChunkEnd greater than or equal to max. */
protected boolean isChunkEndGeMax(Object chunkEnd, Object max, Column splitColumn) {
return SqlServerUtils.compare(chunkEnd, max, splitColumn) >= 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
/** Utilities for converting from SqlServer types to Flink types. */
public class SqlServerTypeUtils {

/** Microsoft SQL type GUID's type name. */
static final String UNIQUEIDENTIFIRER = "uniqueidentifier";

/** Returns a corresponding Flink data type from a debezium {@link Column}. */
public static DataType fromDbzColumn(Column column) {
DataType dataType = convertFromColumn(column);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.connectors.sqlserver.source.utils;

import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;
import org.apache.flink.cdc.connectors.sqlserver.source.offset.LsnOffset;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.RowType;
Expand All @@ -40,14 +41,17 @@

import javax.annotation.Nullable;

import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.flink.table.api.DataTypes.FIELD;
Expand Down Expand Up @@ -297,8 +301,7 @@ private static String buildSplitQuery(
return buildSelectWithRowLimits(
tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty());
} else {
final String orderBy =
pkRowType.getFieldNames().stream().collect(Collectors.joining(", "));
final String orderBy = String.join(", ", pkRowType.getFieldNames());
return buildSelectWithBoundaryRowLimits(
tableId,
limitSize,
Expand All @@ -322,7 +325,7 @@ private static String getMaxPrimaryKeyColumnsProjection(RowType pkRowType) {
StringBuilder sql = new StringBuilder();
for (Iterator<String> fieldNamesIt = pkRowType.getFieldNames().iterator();
fieldNamesIt.hasNext(); ) {
sql.append("MAX(" + fieldNamesIt.next() + ")");
sql.append("MAX(").append(fieldNamesIt.next()).append(")");
if (fieldNamesIt.hasNext()) {
sql.append(" , ");
}
Expand All @@ -342,12 +345,8 @@ private static String buildSelectWithRowLimits(
}
sql.append(projection).append(" FROM ");
sql.append(quoteSchemaAndTable(tableId));
if (condition.isPresent()) {
sql.append(" WHERE ").append(condition.get());
}
if (orderBy.isPresent()) {
sql.append(" ORDER BY ").append(orderBy.get());
}
condition.ifPresent(s -> sql.append(" WHERE ").append(s));
orderBy.ifPresent(s -> sql.append(" ORDER BY ").append(s));
return sql.toString();
}

Expand Down Expand Up @@ -396,11 +395,54 @@ private static String buildSelectWithBoundaryRowLimits(
sql.append(projection);
sql.append(" FROM ");
sql.append(quoteSchemaAndTable(tableId));
if (condition.isPresent()) {
sql.append(" WHERE ").append(condition.get());
}
condition.ifPresent(s -> sql.append(" WHERE ").append(s));
sql.append(" ORDER BY ").append(orderBy);
sql.append(") T");
return sql.toString();
}

public static int compare(Object obj1, Object obj2, Column splitColumn) {
if (splitColumn.typeName().equals(SqlServerTypeUtils.UNIQUEIDENTIFIRER)) {
return new SQLServerUUIDComparator()
.compare(UUID.fromString(obj1.toString()), UUID.fromString(obj2.toString()));
}
return ObjectUtils.compare(obj1, obj2);
}

/**
* Comparator for SQL Server UUIDs. SQL Server compares UUIDs in a different order than Java.
* Reference code: <a
* href="https://github.com/dotnet/runtime/blob/5535e31a712343a63f5d7d796cd874e563e5ac14/src/libraries/System.Data.Common/src/System/Data/SQLTypes/SQLGuid.cs#L113">SQLGuid.cs::CompareTo</a>
* Reference doc: <a
* href="https://learn.microsoft.com/uk-ua/sql/connect/ado-net/sql/compare-guid-uniqueidentifier-values?view=sql-server-ver16">Comparing
* GUID and uniqueidentifier values</a>
*/
static class SQLServerUUIDComparator implements Comparator<UUID> {

private static final int SIZE_OF_GUID = 16;
private static final byte[] GUID_ORDER = {
10, 11, 12, 13, 14, 15, 8, 9, 6, 7, 4, 5, 0, 1, 2, 3
};

public int compare(UUID uuid1, UUID uuid2) {
byte[] bytes1 = uuidToBytes(uuid1);
byte[] bytes2 = uuidToBytes(uuid2);

for (int i = 0; i < SIZE_OF_GUID; i++) {
byte b1 = bytes1[GUID_ORDER[i]];
byte b2 = bytes2[GUID_ORDER[i]];
if (b1 != b2) {
return (b1 & 0xFF) - (b2 & 0xFF); // Unsigned byte comparison
}
}
return 0;
}

private byte[] uuidToBytes(UUID uuid) {
ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
bb.putLong(uuid.getMostSignificantBits());
bb.putLong(uuid.getLeastSignificantBits());
return bb.array();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.sqlserver.source.utils;

import org.apache.flink.cdc.connectors.base.utils.ObjectUtils;

import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import static org.junit.jupiter.api.Assertions.assertEquals;

/** Unit test for {@link SqlServerUtils.SQLServerUUIDComparator}. * */
public class SQLServerUUIDComparatorTest {

@Test
public void testComparator() {
SqlServerUtils.SQLServerUUIDComparator comparator =
new SqlServerUtils.SQLServerUUIDComparator();
// Create an ArrayList and fill it with Guid values.
List<UUID> guidList = new ArrayList<>();
guidList.add(UUID.fromString("3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE"));
guidList.add(UUID.fromString("2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE"));
guidList.add(UUID.fromString("1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE"));

// Sort the Guids.
guidList.sort(ObjectUtils::compare);

assertEquals(
guidList.get(0).toString().toUpperCase(), "1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE");
assertEquals(
guidList.get(1).toString().toUpperCase(), "2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE");
assertEquals(
guidList.get(2).toString().toUpperCase(), "3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE");

// Create an ArrayList of SqlGuids.
List<UUID> sqlGuidList = new ArrayList<>();
sqlGuidList.add(UUID.fromString("3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE"));
sqlGuidList.add(UUID.fromString("2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE"));
sqlGuidList.add(UUID.fromString("1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE"));

// Sort the SqlGuids. The unsorted SqlGuids are in the same order
// as the unsorted Guid values.
sqlGuidList.sort(comparator);

// Display the sorted SqlGuids. The sorted SqlGuid values are ordered
// differently than the Guid values.
assertEquals(
sqlGuidList.get(0).toString().toUpperCase(),
"2AAAAAAA-BBBB-CCCC-DDDD-1EEEEEEEEEEE");
assertEquals(
sqlGuidList.get(1).toString().toUpperCase(),
"3AAAAAAA-BBBB-CCCC-DDDD-2EEEEEEEEEEE");
assertEquals(
sqlGuidList.get(2).toString().toUpperCase(),
"1AAAAAAA-BBBB-CCCC-DDDD-3EEEEEEEEEEE");
}
}
Loading