-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-35912] SqlServer CDC doesn't chunk UUID-typed columns correctly #3497
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private static String reverse(String text) { | ||
String[] arrs = text.split("-"); | ||
List<String> arrList = Arrays.asList(arrs); | ||
Collections.reverse(arrList); | ||
arrs = arrList.toArray(new String[0]); | ||
return String.join("-", arrs); | ||
} | ||
|
||
public static int compare(Object obj1, Object obj2, Column splitColumn) { | ||
if (splitColumn.typeName().equals(UNIQUEIDENTIFIRER)) { | ||
String sObj1 = reverse(obj1.toString()); | ||
String sOjb2 = reverse(obj2.toString()); | ||
return ObjectUtils.compare(sObj1, sOjb2); | ||
} | ||
return ObjectUtils.compare(obj1, obj2); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will it be better if we rename them to reverseUuid
/ compareUuid
, and move these utility functions to org.apache.flink.cdc.connectors.sqlserver.source.utils
package?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice if we can add some test cases to verify UUID comparison logic. Test cases from Microsoft docs would be ideal.
"No maximum LSN recorded in the database; please ensure that the SQL Server Agent is running"); | ||
"{} No maximum LSN recorded in the database; please ensure that the SQL Server Agent is running", | ||
dataConnection.connectionString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this change related to this PR?
Created FLINK-35912 to trace this issue. Would be nice if you could rename this PR as |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @LiPL2017's quick response! Just left some minor comments on test cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
testutils
are some utility functions for testing, while this case is actually testing sqlserver.source.utils
. Putting it in test/java/org/apache/flink/cdc/connectors/sqlserver/source/utils
package makes more sense.
public class SqlserverCompareUuidTest { | ||
private static final Logger LOG = LoggerFactory.getLogger(SqlserverCompareUuidTest.class); | ||
@Test | ||
public void testWorkWithGuids() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use assertEquals
to verify UUID sorting results instead of logging them for manual checks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Except you need to run mvn spotless:apply
to fix code style violations.
Are there any other adjustments needed besides these? |
Could @GOODBOY008 please help driving this to be merged? |
@LiPL2017 Can you rebase to |
@@ -0,0 +1,53 @@ | |||
package org.apache.flink.cdc.connectors.sqlserver.source.utils; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add license header.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add license header.
It has been added
@LiPL2017 I make some change about uuid compare method by refering to MS source code. Can you doule check ? And should rebase to master to resolve conficts. |
@GOODBOY008 Could you please review the code again? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, When #3508 merged, Please rebase master to make ci green.
…olumns correctly (apache#3497) * resolve conficts * polish code to trigger ci --------- Co-authored-by: Kael <kael@fts.dev> Co-authored-by: gongzhongqiang <gongzhongqiang@gigacloudtech.com>
A majority of our Sqlserver databases use uniqueidentifier as primary keys.
When we enable 'scan.incremental.snapshot.enabled = true', flink cdc will try to split into chunks.
The splitTableIntoChunks function relies on the queryMinMax function, which fails when trying to calculate the MIN(UUID) and MAX(UUID). The issue arises because the uniqueidentifier type in SQL Server does not support the traditional MIN and MAX functions in a mathematical sense, as it does not follow a numeric minimum or maximum value.