Skip to content

Commit

Permalink
[hotfix][common] fix nullability when converting cdc data type to fli…
Browse files Browse the repository at this point in the history
…nk data type
  • Loading branch information
Shawn-Hx committed Nov 11, 2024
1 parent 57a1a7d commit b6922ce
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,17 @@ public static org.apache.flink.table.types.DataType toFlinkDataType(DataType typ
? org.apache.flink.table.api.DataTypes.ARRAY(
toFlinkDataType(children.get(0)))
: org.apache.flink.table.api.DataTypes.ARRAY(
toFlinkDataType(children.get(0)).notNull());
toFlinkDataType(children.get(0)))
.notNull();
case MAP:
Preconditions.checkState(children != null && children.size() > 1);
return type.isNullable()
? org.apache.flink.table.api.DataTypes.MAP(
toFlinkDataType(children.get(0)), toFlinkDataType(children.get(1)))
: org.apache.flink.table.api.DataTypes.MAP(
toFlinkDataType(children.get(0)),
toFlinkDataType(children.get(1)).notNull());
toFlinkDataType(children.get(0)),
toFlinkDataType(children.get(1)))
.notNull();
case ROW:
Preconditions.checkState(!CollectionUtil.isNullOrEmpty(children));
RowType rowType = (RowType) type;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@

package org.apache.flink.cdc.common.types.utils;

import org.apache.flink.cdc.common.types.DataField;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;

import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -88,46 +86,67 @@ class DataTypeUtilsTest {
DataTypes.ROW(DataTypes.SMALLINT(), DataTypes.STRING())
};

private static final org.apache.flink.table.types.DataType[] FLINK_TYPES =
new org.apache.flink.table.types.DataType[] {
BOOLEAN(),
BYTES(),
BINARY(10),
VARBINARY(10),
CHAR(10),
VARCHAR(10),
STRING(),
INT(),
TINYINT(),
SMALLINT(),
BIGINT(),
DOUBLE(),
FLOAT(),
DECIMAL(6, 3),
DATE(),
TIME(),
TIME(6),
TIMESTAMP(),
TIMESTAMP(6),
TIMESTAMP_LTZ(),
TIMESTAMP_LTZ(6),
TIMESTAMP_WITH_TIME_ZONE(),
TIMESTAMP_WITH_TIME_ZONE(6),
ARRAY(BIGINT()),
MAP(SMALLINT(), STRING()),
ROW(FIELD("f1", STRING()), FIELD("f2", STRING(), "desc")),
ROW(SMALLINT(), STRING())
};

@Test
void testToFlinkDataType() {
List<DataField> list =
IntStream.range(0, ALL_TYPES.length)
.mapToObj(i -> DataTypes.FIELD("f" + i, ALL_TYPES[i]))
.collect(Collectors.toList());
DataType cdcNullableDataType =
new RowType(
IntStream.range(0, ALL_TYPES.length)
.mapToObj(i -> DataTypes.FIELD("f" + i, ALL_TYPES[i]))
.collect(Collectors.toList()));
DataType cdcNotNullDataType =
new RowType(
IntStream.range(0, ALL_TYPES.length)
.mapToObj(i -> DataTypes.FIELD("f" + i, ALL_TYPES[i].notNull()))
.collect(Collectors.toList()));

org.apache.flink.table.types.DataType dataType =
DataTypeUtils.toFlinkDataType(new RowType(list));
org.apache.flink.table.types.DataType nullableDataType =
DataTypeUtils.toFlinkDataType(cdcNullableDataType);
org.apache.flink.table.types.DataType notNullDataType =
DataTypeUtils.toFlinkDataType(cdcNotNullDataType);

org.apache.flink.table.types.DataType expectedDataType =
org.apache.flink.table.types.DataType expectedNullableDataType =
ROW(
IntStream.range(0, FLINK_TYPES.length)
.mapToObj(i -> FIELD("f" + i, FLINK_TYPES[i]))
.collect(Collectors.toList()));
org.apache.flink.table.types.DataType expectedNotNullDataType =
ROW(
FIELD("f0", BOOLEAN()),
FIELD("f1", BYTES()),
FIELD("f2", BINARY(10)),
FIELD("f3", VARBINARY(10)),
FIELD("f4", CHAR(10)),
FIELD("f5", VARCHAR(10)),
FIELD("f6", STRING()),
FIELD("f7", INT()),
FIELD("f8", TINYINT()),
FIELD("f9", SMALLINT()),
FIELD("f10", BIGINT()),
FIELD("f11", DOUBLE()),
FIELD("f12", FLOAT()),
FIELD("f13", DECIMAL(6, 3)),
FIELD("f14", DATE()),
FIELD("f15", TIME()),
FIELD("f16", TIME(6)),
FIELD("f17", TIMESTAMP()),
FIELD("f18", TIMESTAMP(6)),
FIELD("f19", TIMESTAMP_LTZ()),
FIELD("f20", TIMESTAMP_LTZ(6)),
FIELD("f21", TIMESTAMP_WITH_TIME_ZONE()),
FIELD("f22", TIMESTAMP_WITH_TIME_ZONE(6)),
FIELD("f23", ARRAY(BIGINT())),
FIELD("f24", MAP(SMALLINT(), STRING())),
FIELD("f25", ROW(FIELD("f1", STRING()), FIELD("f2", STRING(), "desc"))),
FIELD("f26", ROW(SMALLINT(), STRING())));
IntStream.range(0, FLINK_TYPES.length)
.mapToObj(i -> FIELD("f" + i, FLINK_TYPES[i].notNull()))
.collect(Collectors.toList()));

assertThat(dataType).isEqualTo(expectedDataType);
assertThat(nullableDataType).isEqualTo(expectedNullableDataType);
assertThat(notNullDataType).isEqualTo(expectedNotNullDataType);
}
}

0 comments on commit b6922ce

Please sign in to comment.