Skip to content

Commit

Permalink
[Improve][Connector-V2] Fake supports column configuration (apache#7503)
Browse files Browse the repository at this point in the history
* [Improve][Connector-V2] Fake supports column configuration

* [Improve][Connector-V2] optimized code
  • Loading branch information
corgy-w authored Aug 28, 2024
1 parent 696f294 commit 39162a4
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ public List<Column> parse(ReadonlyConfig schemaConfig) {
String value = entry.getValue();
SeaTunnelDataType<?> dataType =
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(key, value);
PhysicalColumn column = PhysicalColumn.of(key, dataType, 0, true, null, null);
PhysicalColumn column =
PhysicalColumn.of(key, dataType, null, null, true, null, null);
columns.add(column);
}
return columns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.seatunnel.connectors.seatunnel.fake.source;

import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
Expand All @@ -34,9 +34,11 @@

import java.io.IOException;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.function.Function;

public class FakeDataGenerator {
private final CatalogTable catalogTable;
Expand Down Expand Up @@ -71,12 +73,11 @@ private SeaTunnelRow convertRow(FakeConfig.RowData rowData) {
}

private SeaTunnelRow randomRow() {
SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
String[] fieldNames = rowType.getFieldNames();
SeaTunnelDataType<?>[] fieldTypes = rowType.getFieldTypes();
List<Object> randomRow = new ArrayList<>(fieldNames.length);
for (SeaTunnelDataType<?> fieldType : fieldTypes) {
randomRow.add(randomColumnValue(fieldType));
// Generate random data according to the data type and data colum of the table
List<Column> physicalColumns = catalogTable.getTableSchema().getColumns();
List<Object> randomRow = new ArrayList<>(physicalColumns.size());
for (Column column : physicalColumns) {
randomRow.add(randomColumnValue(column));
}
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(randomRow.toArray());
seaTunnelRow.setTableId(tableId);
Expand All @@ -103,15 +104,16 @@ public List<SeaTunnelRow> generateFakedRows(int rowNum) {
}

@SuppressWarnings("magicnumber")
private Object randomColumnValue(SeaTunnelDataType<?> fieldType) {
private Object randomColumnValue(Column column) {
SeaTunnelDataType<?> fieldType = column.getDataType();
switch (fieldType.getSqlType()) {
case ARRAY:
ArrayType<?, ?> arrayType = (ArrayType<?, ?>) fieldType;
SeaTunnelDataType<?> elementType = arrayType.getElementType();
int length = fakeConfig.getArraySize();
Object array = Array.newInstance(elementType.getTypeClass(), length);
for (int i = 0; i < length; i++) {
Object value = randomColumnValue(elementType);
Object value = randomColumnValue(column.copy(elementType));
Array.set(array, i, value);
}
return array;
Expand All @@ -122,64 +124,70 @@ private Object randomColumnValue(SeaTunnelDataType<?> fieldType) {
HashMap<Object, Object> objectMap = new HashMap<>();
int mapSize = fakeConfig.getMapSize();
for (int i = 0; i < mapSize; i++) {
Object key = randomColumnValue(keyType);
Object value = randomColumnValue(valueType);
Object key = randomColumnValue(column.copy(keyType));
Object value = randomColumnValue(column.copy(valueType));
objectMap.put(key, value);
}
return objectMap;
case STRING:
return fakeDataRandomUtils.randomString();
return value(column, String::toString, fakeDataRandomUtils::randomString);
case BOOLEAN:
return fakeDataRandomUtils.randomBoolean();
return value(column, Boolean::parseBoolean, fakeDataRandomUtils::randomBoolean);
case TINYINT:
return fakeDataRandomUtils.randomTinyint();
return value(column, Byte::parseByte, fakeDataRandomUtils::randomTinyint);
case SMALLINT:
return fakeDataRandomUtils.randomSmallint();
return value(column, Short::parseShort, fakeDataRandomUtils::randomSmallint);
case INT:
return fakeDataRandomUtils.randomInt();
return value(column, Integer::parseInt, fakeDataRandomUtils::randomInt);
case BIGINT:
return fakeDataRandomUtils.randomBigint();
return value(column, Long::parseLong, fakeDataRandomUtils::randomBigint);
case FLOAT:
return fakeDataRandomUtils.randomFloat();
return value(column, Float::parseFloat, fakeDataRandomUtils::randomFloat);
case DOUBLE:
return fakeDataRandomUtils.randomDouble();
return value(column, Double::parseDouble, fakeDataRandomUtils::randomDouble);
case DECIMAL:
DecimalType decimalType = (DecimalType) fieldType;
return fakeDataRandomUtils.randomBigDecimal(
decimalType.getPrecision(), decimalType.getScale());
return value(column, BigDecimal::new, fakeDataRandomUtils::randomBigDecimal);
case NULL:
return null;
case BYTES:
return fakeDataRandomUtils.randomBytes();
return value(column, String::getBytes, fakeDataRandomUtils::randomBytes);
case DATE:
return fakeDataRandomUtils.randomLocalDate();
return value(column, String::toString, fakeDataRandomUtils::randomLocalDate);
case TIME:
return fakeDataRandomUtils.randomLocalTime();
return value(column, String::toString, fakeDataRandomUtils::randomLocalTime);
case TIMESTAMP:
return fakeDataRandomUtils.randomLocalDateTime();
return value(column, String::toString, fakeDataRandomUtils::randomLocalDateTime);
case ROW:
SeaTunnelDataType<?>[] fieldTypes = ((SeaTunnelRowType) fieldType).getFieldTypes();
Object[] objects = new Object[fieldTypes.length];
for (int i = 0; i < fieldTypes.length; i++) {
Object object = randomColumnValue(fieldTypes[i]);
Object object = randomColumnValue(column.copy(fieldTypes[i]));
objects[i] = object;
}
return new SeaTunnelRow(objects);
case BINARY_VECTOR:
return fakeDataRandomUtils.randomBinaryVector();
return fakeDataRandomUtils.randomBinaryVector(column);
case FLOAT_VECTOR:
return fakeDataRandomUtils.randomFloatVector();
return fakeDataRandomUtils.randomFloatVector(column);
case FLOAT16_VECTOR:
return fakeDataRandomUtils.randomFloat16Vector();
return fakeDataRandomUtils.randomFloat16Vector(column);
case BFLOAT16_VECTOR:
return fakeDataRandomUtils.randomBFloat16Vector();
return fakeDataRandomUtils.randomBFloat16Vector(column);
case SPARSE_FLOAT_VECTOR:
return fakeDataRandomUtils.randomSparseFloatVector();
return fakeDataRandomUtils.randomSparseFloatVector(column);
default:
// never got in there
throw new FakeConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
"SeaTunnel Fake source connector not support this data type");
}
}

private static <T> T value(
Column column, Function<String, T> convert, Function<Column, T> generate) {
if (column.getDefaultValue() != null) {
return convert.apply(column.getDefaultValue().toString());
}
return generate.apply(column);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.connectors.seatunnel.fake.utils;

import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.common.utils.BufferUtils;
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;

Expand All @@ -25,6 +27,7 @@
import org.apache.commons.lang3.RandomUtils;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand All @@ -45,38 +48,42 @@ private static <T> T randomFromList(List<T> list) {
return list.get(index);
}

public Boolean randomBoolean() {
public Boolean randomBoolean(Column column) {
return RandomUtils.nextInt(0, 2) == 1;
}

public BigDecimal randomBigDecimal(int precision, int scale) {
public BigDecimal randomBigDecimal(Column column) {
DecimalType dataType = (DecimalType) column.getDataType();
return new BigDecimal(
RandomStringUtils.randomNumeric(precision - scale)
RandomStringUtils.randomNumeric(dataType.getPrecision() - dataType.getScale())
+ "."
+ RandomStringUtils.randomNumeric(scale));
+ RandomStringUtils.randomNumeric(dataType.getScale()));
}

public byte[] randomBytes() {
public byte[] randomBytes(Column column) {
return RandomStringUtils.randomAlphabetic(fakeConfig.getBytesLength()).getBytes();
}

public String randomString() {
public String randomString(Column column) {
List<String> stringTemplate = fakeConfig.getStringTemplate();
if (!CollectionUtils.isEmpty(stringTemplate)) {
return randomFromList(stringTemplate);
}
return RandomStringUtils.randomAlphabetic(fakeConfig.getStringLength());
return RandomStringUtils.randomAlphabetic(
column.getColumnLength() != null
? column.getColumnLength().intValue()
: fakeConfig.getStringLength());
}

public Byte randomTinyint() {
public Byte randomTinyint(Column column) {
List<Integer> tinyintTemplate = fakeConfig.getTinyintTemplate();
if (!CollectionUtils.isEmpty(tinyintTemplate)) {
return randomFromList(tinyintTemplate).byteValue();
}
return (byte) RandomUtils.nextInt(fakeConfig.getTinyintMin(), fakeConfig.getTinyintMax());
}

public Short randomSmallint() {
public Short randomSmallint(Column column) {
List<Integer> smallintTemplate = fakeConfig.getSmallintTemplate();
if (!CollectionUtils.isEmpty(smallintTemplate)) {
return randomFromList(smallintTemplate).shortValue();
Expand All @@ -85,48 +92,55 @@ public Short randomSmallint() {
RandomUtils.nextInt(fakeConfig.getSmallintMin(), fakeConfig.getSmallintMax());
}

public Integer randomInt() {
public Integer randomInt(Column column) {
List<Integer> intTemplate = fakeConfig.getIntTemplate();
if (!CollectionUtils.isEmpty(intTemplate)) {
return randomFromList(intTemplate);
}
return RandomUtils.nextInt(fakeConfig.getIntMin(), fakeConfig.getIntMax());
}

public Long randomBigint() {
public Long randomBigint(Column column) {
List<Long> bigTemplate = fakeConfig.getBigTemplate();
if (!CollectionUtils.isEmpty(bigTemplate)) {
return randomFromList(bigTemplate);
}
return RandomUtils.nextLong(fakeConfig.getBigintMin(), fakeConfig.getBigintMax());
}

public Float randomFloat() {
public Float randomFloat(Column column) {
List<Double> floatTemplate = fakeConfig.getFloatTemplate();
if (!CollectionUtils.isEmpty(floatTemplate)) {
return randomFromList(floatTemplate).floatValue();
}
return RandomUtils.nextFloat(
(float) fakeConfig.getFloatMin(), (float) fakeConfig.getFloatMax());
float v =
RandomUtils.nextFloat(
(float) fakeConfig.getFloatMin(), (float) fakeConfig.getFloatMax());
return column.getScale() == null
? v
: new BigDecimal(v).setScale(column.getScale(), RoundingMode.HALF_UP).floatValue();
}

public Double randomDouble() {
public Double randomDouble(Column column) {
List<Double> doubleTemplate = fakeConfig.getDoubleTemplate();
if (!CollectionUtils.isEmpty(doubleTemplate)) {
return randomFromList(doubleTemplate);
}
return RandomUtils.nextDouble(fakeConfig.getDoubleMin(), fakeConfig.getDoubleMax());
double v = RandomUtils.nextDouble(fakeConfig.getDoubleMin(), fakeConfig.getDoubleMax());
return column.getScale() == null
? v
: new BigDecimal(v).setScale(column.getScale(), RoundingMode.HALF_UP).floatValue();
}

public LocalDate randomLocalDate() {
return randomLocalDateTime().toLocalDate();
public LocalDate randomLocalDate(Column column) {
return randomLocalDateTime(column).toLocalDate();
}

public LocalTime randomLocalTime() {
return randomLocalDateTime().toLocalTime();
public LocalTime randomLocalTime(Column column) {
return randomLocalDateTime(column).toLocalTime();
}

public LocalDateTime randomLocalDateTime() {
public LocalDateTime randomLocalDateTime(Column column) {
int year;
int month;
int day;
Expand Down Expand Up @@ -172,25 +186,32 @@ public LocalDateTime randomLocalDateTime() {
return LocalDateTime.of(year, month, day, hour, minute, second);
}

public ByteBuffer randomBinaryVector() {
int byteCount = fakeConfig.getBinaryVectorDimension() / 8;
public ByteBuffer randomBinaryVector(Column column) {
int byteCount =
(column.getScale() != null)
? column.getScale() / 8
: fakeConfig.getBinaryVectorDimension() / 8;
// binary vector doesn't care endian since each byte is independent
return ByteBuffer.wrap(RandomUtils.nextBytes(byteCount));
}

public ByteBuffer randomFloatVector() {
Float[] floatVector = new Float[fakeConfig.getVectorDimension()];
for (int i = 0; i < fakeConfig.getVectorDimension(); i++) {
public ByteBuffer randomFloatVector(Column column) {
int count =
(column.getScale() != null) ? column.getScale() : fakeConfig.getVectorDimension();
Float[] floatVector = new Float[count];
for (int i = 0; i < count; i++) {
floatVector[i] =
RandomUtils.nextFloat(
fakeConfig.getVectorFloatMin(), fakeConfig.getVectorFloatMax());
}
return BufferUtils.toByteBuffer(floatVector);
}

public ByteBuffer randomFloat16Vector() {
Short[] float16Vector = new Short[fakeConfig.getVectorDimension()];
for (int i = 0; i < fakeConfig.getVectorDimension(); i++) {
public ByteBuffer randomFloat16Vector(Column column) {
int count =
(column.getScale() != null) ? column.getScale() : fakeConfig.getVectorDimension();
Short[] float16Vector = new Short[count];
for (int i = 0; i < count; i++) {
float value =
RandomUtils.nextFloat(
fakeConfig.getVectorFloatMin(), fakeConfig.getVectorFloatMax());
Expand All @@ -199,9 +220,11 @@ public ByteBuffer randomFloat16Vector() {
return BufferUtils.toByteBuffer(float16Vector);
}

public ByteBuffer randomBFloat16Vector() {
Short[] bfloat16Vector = new Short[fakeConfig.getVectorDimension()];
for (int i = 0; i < fakeConfig.getVectorDimension(); i++) {
public ByteBuffer randomBFloat16Vector(Column column) {
int count =
(column.getScale() != null) ? column.getScale() : fakeConfig.getVectorDimension();
Short[] bfloat16Vector = new Short[count];
for (int i = 0; i < count; i++) {
float value =
RandomUtils.nextFloat(
fakeConfig.getVectorFloatMin(), fakeConfig.getVectorFloatMax());
Expand All @@ -210,10 +233,10 @@ public ByteBuffer randomBFloat16Vector() {
return BufferUtils.toByteBuffer(bfloat16Vector);
}

public Map<Integer, Float> randomSparseFloatVector() {
public Map<Integer, Float> randomSparseFloatVector(Column column) {
Map<Integer, Float> sparseVector = new HashMap<>();

Integer nonZeroElements = fakeConfig.getVectorDimension();
int nonZeroElements =
(column.getScale() != null) ? column.getScale() : fakeConfig.getVectorDimension();
while (nonZeroElements > 0) {
Integer index = RandomUtils.nextInt();
Float value =
Expand Down
Loading

0 comments on commit 39162a4

Please sign in to comment.