From 13248faa226816966b787c7570fb4ed2ae08da86 Mon Sep 17 00:00:00 2001 From: ConradJam Date: Thu, 14 Nov 2024 11:24:21 +0800 Subject: [PATCH] [FLINK-35592][cdc-connector][mysql] Fix MysqlDebeziumTimeConverter miss timezone when convert to timestamp This closes #3332 Co-authored-by: Hang Ruan --- .../MysqlDebeziumTimeConverter.java | 106 ++++++++++-------- .../MysqlDebeziumTimeConverterITCase.java | 8 +- .../test/resources/ddl/date_convert_test.sql | 3 +- 3 files changed, 67 insertions(+), 50 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java index c893d18011e..753bb8f2d03 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java @@ -117,54 +117,67 @@ private void registerDateConverter( registration.register( SchemaBuilder.string().name(schemaName).optional(), value -> { - log.debug( - "find schema need to change dateType, field name:{} ,field type:{} ,field value:{} ,field " - + "default:{}", - field.name(), - columnType, - value == null ? "null" : value, - field.hasDefaultValue() ? field.defaultValue() : "null"); - if (value == null) { - return convertDateDefaultValue(field); - } - switch (columnType.toUpperCase(Locale.ROOT)) { - case DATE: - if (value instanceof Integer) { - return this.convertToDate( - columnType, LocalDate.ofEpochDay((Integer) value)); - } - return this.convertToDate(columnType, value); - case TIME: - if (value instanceof Long) { - long l = - Math.multiplyExact( - (Long) value, TimeUnit.MICROSECONDS.toNanos(1)); - return this.convertToTime(columnType, LocalTime.ofNanoOfDay(l)); - } - return this.convertToTime(columnType, value); - case DATETIME: - if (value instanceof Long) { - if (getTimePrecision(field) <= 3) { - return this.convertToTimestamp( - columnType, - Conversions.toInstantFromMillis((Long) value)); - } - if (getTimePrecision(field) <= 6) { - return this.convertToTimestamp( - columnType, - Conversions.toInstantFromMicros((Long) value)); - } - } - return this.convertToTimestamp(columnType, value); - case TIMESTAMP: - return this.convertToTimestampWithTimezone(columnType, value); - default: - throw new IllegalArgumentException( - "Unknown field type " + columnType.toUpperCase(Locale.ROOT)); + try { + return convertDateObject(field, value, columnType); + } catch (Exception e) { + logConvertDateError(field, value); + throw new RuntimeException("MysqlDebeziumConverter error", e); } }); } + private void logConvertDateError(RelationalColumn field, Object value) { + String fieldName = field.name(); + String fieldType = field.typeName().toUpperCase(); + String defaultValue = "null"; + if (field.hasDefaultValue() && field.defaultValue() != null) { + defaultValue = field.defaultValue().toString(); + } + log.error( + "Find schema need to change dateType, but failed. Field name:{}, field type:{}, " + + "field value:{}, field default value:{}", + fieldName, + fieldType, + value == null ? "null" : value, + defaultValue); + } + + private Object convertDateObject(RelationalColumn field, Object value, String columnType) { + if (value == null) { + return convertDateDefaultValue(field); + } + switch (columnType.toUpperCase(Locale.ROOT)) { + case "DATE": + if (value instanceof Integer) { + return this.convertToDate(columnType, LocalDate.ofEpochDay((Integer) value)); + } + return this.convertToDate(columnType, value); + case "TIME": + if (value instanceof Long) { + long l = Math.multiplyExact((Long) value, TimeUnit.MICROSECONDS.toNanos(1)); + return this.convertToTime(columnType, LocalTime.ofNanoOfDay(l)); + } + return this.convertToTime(columnType, value); + case "DATETIME": + if (value instanceof Long) { + if (getTimePrecision(field) <= 3) { + return this.convertToTimestamp( + columnType, Conversions.toInstantFromMillis((Long) value)); + } + if (getTimePrecision(field) <= 6) { + return this.convertToTimestamp( + columnType, Conversions.toInstantFromMicros((Long) value)); + } + } + return this.convertToTimestamp(columnType, value); + case "TIMESTAMP": + return this.convertToTimestampWithTimezone(columnType, value); + default: + throw new IllegalArgumentException( + "Unknown field type " + columnType.toUpperCase(Locale.ROOT)); + } + } + private Object convertToTimestampWithTimezone(String columnType, Object timestamp) { // In snapshot mode, debezium produces a java.sql.Timestamp object for the TIMESTAMPTZ type. // Conceptually, a timestamp with timezone is an Instant. But t.toInstant() actually @@ -178,11 +191,12 @@ private Object convertToTimestampWithTimezone(String columnType, Object timestam ZonedDateTime zonedDateTime = value.toInstant().atZone(zoneId); return ConvertTimeBceUtil.resolveEra(value, zonedDateTime.format(timestampFormatter)); } else if (timestamp instanceof OffsetDateTime) { - OffsetDateTime value = (OffsetDateTime) timestamp; + OffsetDateTime value = + ((OffsetDateTime) timestamp).toInstant().atZone(zoneId).toOffsetDateTime(); return ConvertTimeBceUtil.resolveEra( value.toLocalDate(), value.format(timestampFormatter)); } else if (timestamp instanceof ZonedDateTime) { - ZonedDateTime zonedDateTime = (ZonedDateTime) timestamp; + ZonedDateTime zonedDateTime = ((ZonedDateTime) timestamp).toInstant().atZone(zoneId); return ConvertTimeBceUtil.resolveEra( zonedDateTime.toLocalDate(), zonedDateTime.format(timestampFormatter)); } else if (timestamp instanceof Instant) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/converters/MysqlDebeziumTimeConverterITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/converters/MysqlDebeziumTimeConverterITCase.java index d14f31f9787..8adb92454ae 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/converters/MysqlDebeziumTimeConverterITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/converters/MysqlDebeziumTimeConverterITCase.java @@ -154,7 +154,7 @@ private void testReadDateConvertDataStreamSource(String timezone) throws Excepti private void validTimestampValue(List result) throws JsonProcessingException { ObjectMapper mapper = new ObjectMapper(); - String[] timestampValues = new String[] {"14:23:00", "00:00:00", "00:00:00"}; + String[] timestampValues = new String[] {"14:23:00", "00:00:00", "00:00:00", "15:04:00"}; for (String after : result) { JsonNode jsonNode = mapper.readTree(after); Assert.assertEquals( @@ -232,7 +232,8 @@ private void checkData(TableResult tableResult) { new String[] { "+I[1, 14:23:00, 2023-04-01 14:24:00, 2023-04-01, 14:25:00]", "+I[3, 00:00:00, null, null, 00:01:20]", - "+I[2, 00:00:00, null, null, 00:00:00]" + "+I[2, 00:00:00, null, null, 00:00:00]", + "+I[4, 15:04:00, null, null, 00:01:10]" }; List expectedSnapshotData = new ArrayList<>(Arrays.asList(snapshotForSingleTable)); @@ -283,7 +284,8 @@ private String buildMySqlConfigWithTimezone(String timezone) { + "binlog_format = row\n" + "log_bin = mysql-bin\n" + "server-id = 223344\n" - + "binlog_row_image = FULL\n"; + + "binlog_row_image = FULL\n" + + "sql_mode = ALLOW_INVALID_DATES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION\n"; String timezoneConf = "default-time_zone = '" + timezone + "'\n"; Files.write( cnf, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/date_convert_test.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/date_convert_test.sql index 262c1ceb19f..ed9aadfdc3a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/date_convert_test.sql +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/date_convert_test.sql @@ -33,4 +33,5 @@ INSERT INTO date_convert_test (id,test_timestamp, test_datetime, test_date, test VALUES (1,'2023-04-01 14:23:00', '2023-04-01 14:24:00', '2023-04-01', '14:25:00'), (2,'2024-04-23 00:00:00', DEFAULT, NULL ,'00:00:00'), -(3,'2024-04-23 00:00:00', DEFAULT, NULL ,120); \ No newline at end of file +(3,'2024-04-23 00:00:00', DEFAULT, NULL ,120), +(4,20240612150400, DEFAULT, NULL ,110); \ No newline at end of file