Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
## 0.1.1
* Add support in FixedString, Date/Date32/DateTime/DateTime64, Uint8/16/32/64/128/256, Decimal, UUID
## 0.1.0
* ClickHouse Sink supports Apache Flink 1.17+
80 changes: 41 additions & 39 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,45 +132,47 @@ Planned for a future release — a complete end-to-end example will be added onc

## Supported ClickHouse Types

| Java Type | ClickHouse Type | Supported | Serialize Method |
|-----------------|-----------------|-----------|-------------------------|
| byte/Byte | Int8 | ✅ | Serialize.writeInt8 |
| short/Short | Int16 | ✅ | Serialize.writeInt16 |
| int/Integer | Int32 | ✅ | Serialize.writeInt32 |
| long/Long | Int64 | ✅ | Serialize.writeInt64 |
| BigInteger | Int128 | ✅ | Serialize.writeInt124 |
| BigInteger | Int256 | ✅ | Serialize.writeInt256 |
| byte/Byte | UInt8 | ❌ | N/A |
| short/Short | UInt16 | ❌ | N/A |
| int/Integer | UInt32 | ❌ | N/A |
| long/Long | UInt64 | ❌ | N/A |
| BigInteger | UInt128 | ❌ | N/A |
| BigInteger | UInt256 | ❌ | N/A |
| BigDecimal | Decimal | ❌ | N/A |
| BigDecimal | Decimal32 | ❌ | N/A |
| BigDecimal | Decimal64 | ❌ | N/A |
| BigDecimal | Decimal128 | ❌ | N/A |
| BigDecimal | Decimal256 | ❌ | N/A |
| float/Float | Float | ✅ | Serialize.writeFloat32 |
| double/Double | Double | ✅ | Serialize.writeFloat64 |
| boolean/Boolean | Boolean | ✅ | Serialize.writeBoolean |
| String | String | ✅ | Serialize.writeString |
| String | FixedString | ❌ | N/A |
| LocalDate | Date | ❌ | N/A |
| LocalDate | Date32 | ❌ | N/A |
| LocalDateTime | DateTime | ❌ | N/A |
| LocalDateTime | DateTime64 | ❌ | N/A |
| int/Integer | Time | ❌ | N/A |
| long/Long | Time64 | ❌ | N/A |
| byte/Byte | Enum8 | ✅ | Serialize.writeInt8 |
| int/Integer | Enum16 | ✅ | Serialize.writeInt16 |
| String | JSON | ❌ | N/A |
| Array<Type> | Array<Type> | ❌ | N/A |
| Map<K,V> | Map<K,V> | ❌ | N/A |
| Tuple<Type,..> | Map<T1,T2,..> | ❌ | N/A |
| Object | Variant | ❌ | N/A |

* For date operation need to provide ZoneId.
| Java Type | ClickHouse Type | Supported | Serialize Method |
|-----------------|-----------------|-----------|----------------------------|
| byte/Byte | Int8 | ✅ | Serialize.writeInt8 |
| short/Short | Int16 | ✅ | Serialize.writeInt16 |
| int/Integer | Int32 | ✅ | Serialize.writeInt32 |
| long/Long | Int64 | ✅ | Serialize.writeInt64 |
| BigInteger | Int128 | ✅ | Serialize.writeInt124 |
| BigInteger | Int256 | ✅ | Serialize.writeInt256 |
| int/Integer | UInt8 | ✅ | Serialize.writeUInt8 |
| int/Integer | UInt16 | ✅ | Serialize.writeUInt16 |
| long/Long | UInt32 | ✅ | Serialize.writeUInt32 |
| long/Long | UInt64 | ✅ | Serialize.writeUInt64 |
| BigInteger | UInt128 | ✅ | Serialize.writeUInt128 |
| BigInteger | UInt256 | ✅ | Serialize.writeUInt256 |
| BigDecimal | Decimal | ✅ | Serialize.writeDecimal |
| BigDecimal | Decimal32 | ✅ | Serialize.writeDecimal |
| BigDecimal | Decimal64 | ✅ | Serialize.writeDecimal |
| BigDecimal | Decimal128 | ✅ | Serialize.writeDecimal |
| BigDecimal | Decimal256 | ✅ | Serialize.writeDecimal |
| float/Float | Float | ✅ | Serialize.writeFloat32 |
| double/Double | Double | ✅ | Serialize.writeFloat64 |
| boolean/Boolean | Boolean | ✅ | Serialize.writeBoolean |
| String | String | ✅ | Serialize.writeString |
| String | FixedString | ✅ | Serialize.writeFixedString |
| LocalDate | Date | ✅ | Serialize.writeDate |
| LocalDate | Date32 | ✅ | Serialize.writeDate32 |
| LocalDateTime | DateTime | ✅ | Serialize.writeDateTime |
| LocalDateTime | DateTime64 | ✅ | Serialize.writeDateTime64 |
| int/Integer | Time | ❌ | N/A |
| long/Long | Time64 | ❌ | N/A |
| byte/Byte | Enum8 | ✅ | Serialize.writeInt8 |
| int/Integer | Enum16 | ✅ | Serialize.writeInt16 |
| java.util.UUID | UUID | ✅ | Serialize.writeIntUUID |
| String | JSON | ❌ | N/A |
| Array<Type> | Array<Type> | ❌ | N/A |
| Map<K,V> | Map<K,V> | ❌ | N/A |
| Tuple<Type,..> | Map<T1,T2,..> | ❌ | N/A |
| Object | Variant | ❌ | N/A |

* A ZoneId must also be provided when performing date operations.
* Precision and scale must also be provided when performing decimal operations.

## Configuration Options

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,56 @@ public class ClickHouseSinkTests extends FlinkClusterTests {

static final int STREAM_PARALLELISM = 5;

private String createSimplePOJOTableSQL(String database, String tableName, int parts_to_throw_insert) {
String createTable = createSimplePOJOTableSQL(database, tableName);
return createTable.trim().substring(0, createTable.trim().length() - 1) + " " + String.format("SETTINGS parts_to_throw_insert = %d;", parts_to_throw_insert);
}

private String createSimplePOJOTableSQL(String database, String tableName) {
return "CREATE TABLE `" + database + "`.`" + tableName + "` (" +
"bytePrimitive Int8," +
"byteObject Int8," +
"shortPrimitive Int16," +
"shortObject Int16," +
"intPrimitive Int32," +
"integerObject Int32," +
"longPrimitive Int64," +
"longObject Int64," +
"bigInteger128 Int128," +
"bigInteger256 Int256," +
"uint8Primitive UInt8," +
"uint8Object UInt8," +
"uint16Primitive UInt16," +
"uint16Object UInt16," +
"uint32Primitive UInt32," +
"uint32Object UInt32," +
"uint64Primitive UInt64," +
"uint64Object UInt64," +
"uint128Object UInt128," +
"uint256Object UInt256," +
"decimal Decimal(10,5)," +
"decimal32 Decimal32(9)," +
"decimal64 Decimal64(18)," +
"decimal128 Decimal128(38)," +
"decimal256 Decimal256(76)," +
"floatPrimitive Float," +
"floatObject Float," +
"doublePrimitive Double," +
"doubleObject Double," +
"booleanPrimitive Boolean," +
"booleanObject Boolean," +
"str String," +
"fixedStr FixedString(10)," +
"v_date Date," +
"v_date32 Date32," +
"v_dateTime DateTime," +
"v_dateTime64 DateTime64," +
"uuid UUID," +
") " +
"ENGINE = MergeTree " +
"ORDER BY (longPrimitive); ";
}

private int executeAsyncJob(StreamExecutionEnvironment env, String tableName, int numIterations, int expectedRows) throws Exception {
JobClient jobClient = env.executeAsync("Read GZipped CSV with FileSource");
int rows = 0;
Expand Down Expand Up @@ -197,27 +247,7 @@ void SimplePOJODataTest() throws Exception {
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName);
ClickHouseServerForTests.executeSql(dropTable);
// create table
String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" +
"bytePrimitive Int8," +
"byteObject Int8," +
"shortPrimitive Int16," +
"shortObject Int16," +
"intPrimitive Int32," +
"integerObject Int32," +
"longPrimitive Int64," +
"longObject Int64," +
"bigInteger128 Int128," +
"bigInteger256 Int256," +
"floatPrimitive Float," +
"floatObject Float," +
"doublePrimitive Double," +
"doubleObject Double," +
"booleanPrimitive Boolean," +
"booleanObject Boolean," +
"str String," +
") " +
"ENGINE = MergeTree " +
"ORDER BY (longPrimitive); ";
String tableSql = createSimplePOJOTableSQL(getDatabase(), tableName);
ClickHouseServerForTests.executeSql(tableSql);


Expand Down Expand Up @@ -465,28 +495,7 @@ void SimplePOJODataTooManyPartsTest() throws Exception {
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName);
ClickHouseServerForTests.executeSql(dropTable);
// create table
String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" +
"bytePrimitive Int8," +
"byteObject Int8," +
"shortPrimitive Int16," +
"shortObject Int16," +
"intPrimitive Int32," +
"integerObject Int32," +
"longPrimitive Int64," +
"longObject Int64," +
"bigInteger128 Int128," +
"bigInteger256 Int256," +
"floatPrimitive Float," +
"floatObject Float," +
"doublePrimitive Double," +
"doubleObject Double," +
"booleanPrimitive Boolean," +
"booleanObject Boolean," +
"str String," +
") " +
"ENGINE = MergeTree " +
"ORDER BY (longPrimitive) " +
"SETTINGS parts_to_throw_insert = 10;";
String tableSql = createSimplePOJOTableSQL(getDatabase(), tableName, 10);
ClickHouseServerForTests.executeSql(tableSql);
//ClickHouseServerForTests.executeSql(String.format("SYSTEM STOP MERGES `%s.%s`", getDatabase(), tableName));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,28 @@ public void instrument(OutputStream out, SimplePOJO input) throws IOException {
Serialize.writeInt128(out, input.getBigInteger128(), false, false, ClickHouseDataType.Int128, false, "bigInteger128");
Serialize.writeInt256(out, input.getBigInteger256(), false, false, ClickHouseDataType.Int256, false, "bigInteger256");

// UIntX
Serialize.writeUInt8(out, input.getUint8Primitive(), false, false, ClickHouseDataType.UInt8, false, "uint8Primitive");
Serialize.writeUInt8(out, input.getUint8Object(), false, false, ClickHouseDataType.UInt8, false, "uint8Object");

Serialize.writeUInt16(out, input.getUint16Primitive(), false, false, ClickHouseDataType.UInt16, false, "uint8Primitive");
Serialize.writeUInt16(out, input.getUint16Object(), false, false, ClickHouseDataType.UInt16, false, "uint8Object");

Serialize.writeUInt32(out, input.getUint32Primitive(), false, false, ClickHouseDataType.UInt32, false, "uint8Primitive");
Serialize.writeUInt32(out, input.getUint32Object(), false, false, ClickHouseDataType.UInt32, false, "uint8Object");

Serialize.writeUInt64(out, input.getUint64Primitive(), false, false, ClickHouseDataType.UInt64, false, "uint8Primitive");
Serialize.writeUInt64(out, input.getUint64Object(), false, false, ClickHouseDataType.UInt64, false, "uint8Object");

Serialize.writeUInt128(out, input.getUint128Object(), false, false, ClickHouseDataType.UInt128, false, "bigInteger128");
Serialize.writeUInt256(out, input.getUint256Object(), false, false, ClickHouseDataType.UInt256, false, "bigInteger256");

Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal, false, "decimal", 10, 5);
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal32, false, "decimal32", 9, 1);
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal64, false, "decimal64", 18, 10);
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal128, false, "decimal128", 38, 19);
Serialize.writeDecimal(out, input.getBigDecimal(), false, false, ClickHouseDataType.Decimal256, false, "decimal256", 76, 39);

Serialize.writeFloat32(out, input.getFloatPrimitive(), false, false, ClickHouseDataType.Float32, false, "floatPrimitive");
Serialize.writeFloat32(out, input.getFloatObject(), false, false, ClickHouseDataType.Float32, false, "floatObject");

Expand All @@ -35,7 +57,14 @@ public void instrument(OutputStream out, SimplePOJO input) throws IOException {
Serialize.writeBoolean(out, input.isBooleanPrimitive(), false, false, ClickHouseDataType.Bool, false, "booleanPrimitive");
Serialize.writeBoolean(out, input.getBooleanObject(), false, false, ClickHouseDataType.Bool, false, "booleanObject");

Serialize.writeString(out, input.getStr(), false, false, ClickHouseDataType.String, false, "String");
Serialize.writeString(out, input.getStr(), false, false, ClickHouseDataType.String, false, "str");
Serialize.writeFixedString(out, input.getFixedStr(), false, false, ClickHouseDataType.FixedString, false, "fixedStr", 10);

Serialize.writeDate(out, input.getDate(), false, false, ClickHouseDataType.Date, false, "v_date");
Serialize.writeDate32(out, input.getDate32(), false, false, ClickHouseDataType.Date32, false, "v_date32");
Serialize.writeTimeDate(out, input.getDateTime(), false, false, ClickHouseDataType.DateTime, false, "v_dateTime");
Serialize.writeTimeDate64(out, input.getDateTime64(), false, false, ClickHouseDataType.DateTime64, false, "v_dateTime64", 1);

Serialize.writeUUID(out, input.getUuid(), false, false, ClickHouseDataType.UUID, false, "uuid");
}
}
Loading
Loading