Skip to content

Commit

Permalink
[FLINK-35592][cdc-connector][mysql] Fix MysqlDebeziumTimeConverter mi…
Browse files Browse the repository at this point in the history
…ss timezone when convert to timestamp

This closes #3332

Co-authored-by: Hang Ruan <ruanhang1993@hotmail.com>
  • Loading branch information
czy006 and ruanhang1993 authored Nov 14, 2024
1 parent d9ceee0 commit 13248fa
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,54 +117,67 @@ private void registerDateConverter(
registration.register(
SchemaBuilder.string().name(schemaName).optional(),
value -> {
log.debug(
"find schema need to change dateType, field name:{} ,field type:{} ,field value:{} ,field "
+ "default:{}",
field.name(),
columnType,
value == null ? "null" : value,
field.hasDefaultValue() ? field.defaultValue() : "null");
if (value == null) {
return convertDateDefaultValue(field);
}
switch (columnType.toUpperCase(Locale.ROOT)) {
case DATE:
if (value instanceof Integer) {
return this.convertToDate(
columnType, LocalDate.ofEpochDay((Integer) value));
}
return this.convertToDate(columnType, value);
case TIME:
if (value instanceof Long) {
long l =
Math.multiplyExact(
(Long) value, TimeUnit.MICROSECONDS.toNanos(1));
return this.convertToTime(columnType, LocalTime.ofNanoOfDay(l));
}
return this.convertToTime(columnType, value);
case DATETIME:
if (value instanceof Long) {
if (getTimePrecision(field) <= 3) {
return this.convertToTimestamp(
columnType,
Conversions.toInstantFromMillis((Long) value));
}
if (getTimePrecision(field) <= 6) {
return this.convertToTimestamp(
columnType,
Conversions.toInstantFromMicros((Long) value));
}
}
return this.convertToTimestamp(columnType, value);
case TIMESTAMP:
return this.convertToTimestampWithTimezone(columnType, value);
default:
throw new IllegalArgumentException(
"Unknown field type " + columnType.toUpperCase(Locale.ROOT));
try {
return convertDateObject(field, value, columnType);
} catch (Exception e) {
logConvertDateError(field, value);
throw new RuntimeException("MysqlDebeziumConverter error", e);
}
});
}

private void logConvertDateError(RelationalColumn field, Object value) {
String fieldName = field.name();
String fieldType = field.typeName().toUpperCase();
String defaultValue = "null";
if (field.hasDefaultValue() && field.defaultValue() != null) {
defaultValue = field.defaultValue().toString();
}
log.error(
"Find schema need to change dateType, but failed. Field name:{}, field type:{}, "
+ "field value:{}, field default value:{}",
fieldName,
fieldType,
value == null ? "null" : value,
defaultValue);
}

private Object convertDateObject(RelationalColumn field, Object value, String columnType) {
if (value == null) {
return convertDateDefaultValue(field);
}
switch (columnType.toUpperCase(Locale.ROOT)) {
case "DATE":
if (value instanceof Integer) {
return this.convertToDate(columnType, LocalDate.ofEpochDay((Integer) value));
}
return this.convertToDate(columnType, value);
case "TIME":
if (value instanceof Long) {
long l = Math.multiplyExact((Long) value, TimeUnit.MICROSECONDS.toNanos(1));
return this.convertToTime(columnType, LocalTime.ofNanoOfDay(l));
}
return this.convertToTime(columnType, value);
case "DATETIME":
if (value instanceof Long) {
if (getTimePrecision(field) <= 3) {
return this.convertToTimestamp(
columnType, Conversions.toInstantFromMillis((Long) value));
}
if (getTimePrecision(field) <= 6) {
return this.convertToTimestamp(
columnType, Conversions.toInstantFromMicros((Long) value));
}
}
return this.convertToTimestamp(columnType, value);
case "TIMESTAMP":
return this.convertToTimestampWithTimezone(columnType, value);
default:
throw new IllegalArgumentException(
"Unknown field type " + columnType.toUpperCase(Locale.ROOT));
}
}

private Object convertToTimestampWithTimezone(String columnType, Object timestamp) {
// In snapshot mode, debezium produces a java.sql.Timestamp object for the TIMESTAMPTZ type.
// Conceptually, a timestamp with timezone is an Instant. But t.toInstant() actually
Expand All @@ -178,11 +191,12 @@ private Object convertToTimestampWithTimezone(String columnType, Object timestam
ZonedDateTime zonedDateTime = value.toInstant().atZone(zoneId);
return ConvertTimeBceUtil.resolveEra(value, zonedDateTime.format(timestampFormatter));
} else if (timestamp instanceof OffsetDateTime) {
OffsetDateTime value = (OffsetDateTime) timestamp;
OffsetDateTime value =
((OffsetDateTime) timestamp).toInstant().atZone(zoneId).toOffsetDateTime();
return ConvertTimeBceUtil.resolveEra(
value.toLocalDate(), value.format(timestampFormatter));
} else if (timestamp instanceof ZonedDateTime) {
ZonedDateTime zonedDateTime = (ZonedDateTime) timestamp;
ZonedDateTime zonedDateTime = ((ZonedDateTime) timestamp).toInstant().atZone(zoneId);
return ConvertTimeBceUtil.resolveEra(
zonedDateTime.toLocalDate(), zonedDateTime.format(timestampFormatter));
} else if (timestamp instanceof Instant) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private void testReadDateConvertDataStreamSource(String timezone) throws Excepti

private void validTimestampValue(List<String> result) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
String[] timestampValues = new String[] {"14:23:00", "00:00:00", "00:00:00"};
String[] timestampValues = new String[] {"14:23:00", "00:00:00", "00:00:00", "15:04:00"};
for (String after : result) {
JsonNode jsonNode = mapper.readTree(after);
Assert.assertEquals(
Expand Down Expand Up @@ -232,7 +232,8 @@ private void checkData(TableResult tableResult) {
new String[] {
"+I[1, 14:23:00, 2023-04-01 14:24:00, 2023-04-01, 14:25:00]",
"+I[3, 00:00:00, null, null, 00:01:20]",
"+I[2, 00:00:00, null, null, 00:00:00]"
"+I[2, 00:00:00, null, null, 00:00:00]",
"+I[4, 15:04:00, null, null, 00:01:10]"
};

List<String> expectedSnapshotData = new ArrayList<>(Arrays.asList(snapshotForSingleTable));
Expand Down Expand Up @@ -283,7 +284,8 @@ private String buildMySqlConfigWithTimezone(String timezone) {
+ "binlog_format = row\n"
+ "log_bin = mysql-bin\n"
+ "server-id = 223344\n"
+ "binlog_row_image = FULL\n";
+ "binlog_row_image = FULL\n"
+ "sql_mode = ALLOW_INVALID_DATES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION\n";
String timezoneConf = "default-time_zone = '" + timezone + "'\n";
Files.write(
cnf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ INSERT INTO date_convert_test (id,test_timestamp, test_datetime, test_date, test
VALUES
(1,'2023-04-01 14:23:00', '2023-04-01 14:24:00', '2023-04-01', '14:25:00'),
(2,'2024-04-23 00:00:00', DEFAULT, NULL ,'00:00:00'),
(3,'2024-04-23 00:00:00', DEFAULT, NULL ,120);
(3,'2024-04-23 00:00:00', DEFAULT, NULL ,120),
(4,20240612150400, DEFAULT, NULL ,110);

0 comments on commit 13248fa

Please sign in to comment.