Skip to content

Commit

Permalink
[Hotfix][Connector-V2] Fixed lost data precision for decimal data typ…
Browse files Browse the repository at this point in the history
…es (apache#7527)
  • Loading branch information
dailai authored Aug 30, 2024
1 parent f49b263 commit df210ea
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -346,17 +346,18 @@ public static SeaTunnelRow convert(
*
* @param seaTunnelRow SeaTunnel row object
* @param seaTunnelRowType SeaTunnel row type
* @param tableSchema Paimon table schema
* @param sinkTableSchema Paimon table schema
* @return Paimon row object
*/
public static InternalRow reconvert(
SeaTunnelRow seaTunnelRow, SeaTunnelRowType seaTunnelRowType, TableSchema tableSchema) {
List<DataField> sinkTotalFields = tableSchema.fields();
SeaTunnelRow seaTunnelRow,
SeaTunnelRowType seaTunnelRowType,
TableSchema sinkTableSchema) {
List<DataField> sinkTotalFields = sinkTableSchema.fields();
int sourceTotalFields = seaTunnelRowType.getTotalFields();
if (sourceTotalFields != sinkTotalFields.size()) {
throw new CommonError()
.writeRowErrorWithFiledsCountNotMatch(
"Paimon", sourceTotalFields, sinkTotalFields.size());
throw CommonError.writeRowErrorWithFiledsCountNotMatch(
"Paimon", sourceTotalFields, sinkTotalFields.size());
}
BinaryRow binaryRow = new BinaryRow(sourceTotalFields);
BinaryWriter binaryWriter = new BinaryRowWriter(binaryRow);
Expand Down Expand Up @@ -399,14 +400,17 @@ public static InternalRow reconvert(
binaryWriter.writeDouble(i, (Double) seaTunnelRow.getField(i));
break;
case DECIMAL:
DecimalType fieldType = (DecimalType) seaTunnelRowType.getFieldType(i);
DataField decimalDataField =
SchemaUtil.getDataField(sinkTotalFields, fieldName);
org.apache.paimon.types.DecimalType decimalType =
(org.apache.paimon.types.DecimalType) decimalDataField.type();
binaryWriter.writeDecimal(
i,
Decimal.fromBigDecimal(
(BigDecimal) seaTunnelRow.getField(i),
fieldType.getPrecision(),
fieldType.getScale()),
fieldType.getPrecision());
decimalType.getPrecision(),
decimalType.getScale()),
decimalType.getPrecision());
break;
case STRING:
binaryWriter.writeString(
Expand Down Expand Up @@ -464,9 +468,12 @@ public static InternalRow reconvert(
SeaTunnelDataType<?> rowType = seaTunnelRowType.getFieldType(i);
Object row = seaTunnelRow.getField(i);
InternalRow paimonRow =
reconvert((SeaTunnelRow) row, (SeaTunnelRowType) rowType, tableSchema);
reconvert(
(SeaTunnelRow) row,
(SeaTunnelRowType) rowType,
sinkTableSchema);
RowType paimonRowType =
RowTypeConverter.reconvert((SeaTunnelRowType) rowType, tableSchema);
RowTypeConverter.reconvert((SeaTunnelRowType) rowType, sinkTableSchema);
binaryWriter.writeRow(i, paimonRow, new InternalRowSerializer(paimonRowType));
break;
default:
Expand All @@ -489,12 +496,25 @@ private static void checkCanWriteWithType(
DataField exceptDataField = new DataField(i, sourceFieldName, exceptDataType);
DataType sinkDataType = sinkDataField.type();
if (!exceptDataType.getTypeRoot().equals(sinkDataType.getTypeRoot())) {
throw new CommonError()
.writeRowErrorWithSchemaIncompatibleSchema(
"Paimon",
sourceFieldName + StringUtils.SPACE + sourceFieldType.getSqlType(),
exceptDataField.asSQLString(),
sinkDataField.asSQLString());
throw CommonError.writeRowErrorWithSchemaIncompatibleSchema(
"Paimon",
sourceFieldName + StringUtils.SPACE + sourceFieldType.getSqlType(),
exceptDataField.asSQLString(),
sinkDataField.asSQLString());
}
if (sourceFieldType instanceof DecimalType
&& sinkDataType instanceof org.apache.paimon.types.DecimalType) {
DecimalType sourceDecimalType = (DecimalType) sourceFieldType;
org.apache.paimon.types.DecimalType sinkDecimalType =
(org.apache.paimon.types.DecimalType) sinkDataType;
if (sinkDecimalType.getPrecision() < sourceDecimalType.getPrecision()
|| sinkDecimalType.getScale() < sourceDecimalType.getScale()) {
throw CommonError.writeRowErrorWithSchemaIncompatibleSchema(
"Paimon",
sourceFieldName + StringUtils.SPACE + sourceFieldType.getSqlType(),
exceptDataField.asSQLString(),
sinkDataField.asSQLString());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;

import org.apache.commons.lang3.StringUtils;
import org.apache.paimon.data.BinaryArray;
import org.apache.paimon.data.BinaryArrayWriter;
import org.apache.paimon.data.BinaryMap;
Expand Down Expand Up @@ -66,45 +69,54 @@ public class RowConverterTest {

private SeaTunnelRowType seaTunnelRowType;

private TableSchema tableSchema;

public static final RowType DEFAULT_ROW_TYPE =
RowType.of(
new DataType[] {
DataTypes.TINYINT(),
DataTypes.SMALLINT(),
DataTypes.INT(),
DataTypes.BIGINT(),
DataTypes.FLOAT(),
DataTypes.DOUBLE(),
DataTypes.DECIMAL(10, 10),
DataTypes.STRING(),
DataTypes.BYTES(),
DataTypes.BOOLEAN(),
DataTypes.DATE(),
DataTypes.TIMESTAMP(),
DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
DataTypes.ARRAY(DataTypes.STRING())
},
new String[] {
"c_tinyint",
"c_smallint",
"c_int",
"c_bigint",
"c_float",
"c_double",
"c_decimal",
"c_string",
"c_bytes",
"c_boolean",
"c_date",
"c_timestamp",
"c_map",
"c_array"
});

public static final List<String> KEY_NAME_LIST = Arrays.asList("c_tinyint");

public TableSchema getTableSchema(int decimalPrecision, int decimalScale) {
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.TINYINT(),
DataTypes.SMALLINT(),
DataTypes.INT(),
DataTypes.BIGINT(),
DataTypes.FLOAT(),
DataTypes.DOUBLE(),
DataTypes.DECIMAL(decimalPrecision, decimalScale),
DataTypes.STRING(),
DataTypes.BYTES(),
DataTypes.BOOLEAN(),
DataTypes.DATE(),
DataTypes.TIMESTAMP(),
DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
DataTypes.ARRAY(DataTypes.STRING())
},
new String[] {
"c_tinyint",
"c_smallint",
"c_int",
"c_bigint",
"c_float",
"c_double",
"c_decimal",
"c_string",
"c_bytes",
"c_boolean",
"c_date",
"c_timestamp",
"c_map",
"c_array"
});

return new TableSchema(
0,
TableSchema.newFields(rowType),
rowType.getFieldCount(),
Collections.EMPTY_LIST,
KEY_NAME_LIST,
Collections.EMPTY_MAP,
"");
}

@BeforeEach
public void before() {
seaTunnelRowType =
Expand Down Expand Up @@ -215,27 +227,33 @@ public void before() {
binaryRowWriter.writeArray(
13, binaryArray2, new InternalArraySerializer(DataTypes.STRING()));
internalRow = binaryRow;

tableSchema =
new TableSchema(
0,
TableSchema.newFields(DEFAULT_ROW_TYPE),
DEFAULT_ROW_TYPE.getFieldCount(),
Collections.EMPTY_LIST,
KEY_NAME_LIST,
Collections.EMPTY_MAP,
"");
}

@Test
public void seaTunnelToPaimon() {
InternalRow convert = RowConverter.reconvert(seaTunnelRow, seaTunnelRowType, tableSchema);
Assertions.assertEquals(convert, internalRow);
SeaTunnelRuntimeException actualException =
Assertions.assertThrows(
SeaTunnelRuntimeException.class,
() ->
RowConverter.reconvert(
seaTunnelRow, seaTunnelRowType, getTableSchema(10, 10)));
SeaTunnelRuntimeException exceptedException =
CommonError.writeRowErrorWithSchemaIncompatibleSchema(
"Paimon",
"c_decimal" + StringUtils.SPACE + "DECIMAL",
"`c_decimal` DECIMAL(30, 8)",
"`c_decimal` DECIMAL(10, 10)");
Assertions.assertEquals(exceptedException.getMessage(), actualException.getMessage());

InternalRow reconvert =
RowConverter.reconvert(seaTunnelRow, seaTunnelRowType, getTableSchema(30, 8));
Assertions.assertEquals(reconvert, internalRow);
}

@Test
public void paimonToSeaTunnel() {
SeaTunnelRow convert = RowConverter.convert(internalRow, seaTunnelRowType, tableSchema);
SeaTunnelRow convert =
RowConverter.convert(internalRow, seaTunnelRowType, getTableSchema(10, 10));
Assertions.assertEquals(convert, seaTunnelRow);
}
}

0 comments on commit df210ea

Please sign in to comment.