Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
## 0.1.0
* ClickHouse Sink supports Apache Flink 1.17+
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Maven
<dependency>
<groupId>com.clickhouse.flink</groupId>
<artifactId>flink-connector-clickhouse-2.0.0</artifactId>
<version>0.0.1</version>
<version>0.1.0</version>
<type>pom</type>
</dependency>
```
Expand All @@ -61,7 +61,7 @@ Maven
<dependency>
<groupId>com.clickhouse.flink</groupId>
<artifactId>flink-connector-clickhouse-1.17</artifactId>
<version>0.0.1</version>
<version>0.1.0</version>
<type>pom</type>
</dependency>
```
Expand Down
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
id("com.gradleup.shadow") version "9.0.2"
}

val sinkVersion by extra("0.0.1")
val sinkVersion by extra("0.1.0")
val flinkVersion by extra("1.18.0")
val clickhouseVersion by extra("0.9.1")
val junitVersion by extra("5.8.2")
Expand Down
2 changes: 1 addition & 1 deletion examples/maven/flink-v1.7/covid/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ under the License.
<dependency>
<groupId>com.clickhouse.flink</groupId>
<artifactId>flink-connector-clickhouse-1.17</artifactId>
<version>0.0.1</version>
Copy link
Member

@mshustov mshustov Sep 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mzitnik can we start with 0.1.0 as we did for kafka connect sink? https://github.com/ClickHouse/clickhouse-kafka-connect/releases?page=4
I'm not comfortable with 1.0.0 since we might need to introduce breaking changes based on the initial feedback

Copy link
Contributor Author

@mzitnik mzitnik Sep 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mshustov
In Kafka Connect, we started with 0.0.1 https://github.com/ClickHouse/clickhouse-kafka-connect/releases/tag/v0.0.1-alfa
Looking at this, I was thinking maybe the DataStream we use 1.0.0 And when we develop the Table API, we will use 2.0.0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mzitnik I know, 0.0.1 just looks like an alpha or beta version, which might look scary, so I'd prefer 0.1.0 to re-assure the customers

<version>0.1.0</version>
<classifier>all</classifier>
</dependency>

Expand Down
2 changes: 1 addition & 1 deletion examples/maven/flink-v2/covid/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ under the License.
<dependency>
<groupId>com.clickhouse.flink</groupId>
<artifactId>flink-connector-clickhouse-2.0.0</artifactId>
<version>0.0.1</version>
<version>0.1.0</version>
<classifier>all</classifier>
</dependency>

Expand Down
2 changes: 1 addition & 1 deletion examples/sbt/covid/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ libraryDependencies ++= Seq(
"org.apache.flink" % "flink-streaming-java" % flinkVersion % "provided",
"org.apache.flink" % "flink-clients" % flinkVersion % "provided",
"org.apache.flink" % "flink-connector-files" % "2.0.0" % "provided",
"org.apache.flink.connector" % "clickhouse" % "0.0.1" classifier "all"
"org.apache.flink.connector" % "clickhouse" % "0.1.0" classifier "all"
)

assembly / assemblyJarName := "covid.jar"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,11 +310,16 @@ void ProductNameTest() throws Exception {
lines.sinkTo(csvSink);
int rows = executeAsyncJob(env, tableName, 10, EXPECTED_ROWS);
Assertions.assertEquals(EXPECTED_ROWS, rows);
ClickHouseServerForTests.executeSql("SYSTEM FLUSH LOGS");
if (ClickHouseServerForTests.isCloud())
Thread.sleep(5000);
ClickHouseServerForTests.executeSql("SYSTEM FLUSH LOGS ON CLUSTER 'default'");
else
ClickHouseServerForTests.executeSql("SYSTEM FLUSH LOGS");

if (ClickHouseServerForTests.isCloud())
Thread.sleep(10000);
// let's wait until data will be available in query log
String productName = ClickHouseServerForTests.extractProductName(ClickHouseServerForTests.getDatabase(), tableName);
String startWith = String.format("Flink-ClickHouse-Sink/%s", ClickHouseSinkVersion.getVersion());
String productName = ClickHouseServerForTests.extractProductName(ClickHouseServerForTests.getDatabase(), tableName, startWith);
String compareString = String.format("Flink-ClickHouse-Sink/%s (fv:flink/%s, lv:scala/2.12)", ClickHouseSinkVersion.getVersion(), flinkVersion);

boolean isContains = productName.contains(compareString);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ public static int countRows(String table) throws ExecutionException, Interrupted
return countResult.get(0).getInteger(1);
}
// http_user_agent
public static String extractProductName(String databaseName, String tableName) {
String extractProductName = String.format("SELECT http_user_agent, tables FROM clusterAllReplicas('default', system.query_log) WHERE type = 'QueryStart' AND query_kind = 'Insert' AND has(databases,'%s') AND has(tables,'%s.%s') LIMIT 100", databaseName, databaseName, tableName);
public static String extractProductName(String databaseName, String tableName, String startWith) {
String extractProductName = String.format("SELECT http_user_agent, tables FROM clusterAllReplicas('default', system.query_log) WHERE type = 'QueryStart' AND query_kind = 'Insert' AND has(databases,'%s') AND has(tables,'%s.%s') and startsWith(http_user_agent, '%s') LIMIT 100", databaseName, databaseName, tableName, startWith);
Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password);
List<GenericRecord> userAgentResult = client.queryAll(extractProductName);
if (!userAgentResult.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,16 @@ void ProductNameTest() throws Exception {
lines.sinkTo(csvSink);
int rows = executeAsyncJob(env, tableName, 10, EXPECTED_ROWS);
Assertions.assertEquals(EXPECTED_ROWS, rows);
ClickHouseServerForTests.executeSql("SYSTEM FLUSH LOGS");
if (ClickHouseServerForTests.isCloud())
Thread.sleep(5000);
ClickHouseServerForTests.executeSql("SYSTEM FLUSH LOGS ON CLUSTER 'default'");
else
ClickHouseServerForTests.executeSql("SYSTEM FLUSH LOGS");

if (ClickHouseServerForTests.isCloud())
Thread.sleep(10000);
// let's wait until data will be available in query log
String productName = ClickHouseServerForTests.extractProductName(ClickHouseServerForTests.getDatabase(), tableName, "Flink-ClickHouse-Sink");
String startWith = String.format("Flink-ClickHouse-Sink/%s", ClickHouseSinkVersion.getVersion());
String productName = ClickHouseServerForTests.extractProductName(ClickHouseServerForTests.getDatabase(), tableName, startWith);
String compareString = String.format("Flink-ClickHouse-Sink/%s (fv:flink/2.0.0, lv:scala/2.12)", ClickHouseSinkVersion.getVersion());
boolean isContains = productName.contains(compareString);
Assertions.assertTrue(isContains, "Expected user agent to contain: " + compareString + " but got: " + productName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,15 @@ public static int countRows(String table) throws ExecutionException, Interrupted
}

// http_user_agent
public static String extractProductName(String databaseName, String tableName, String productNameStartWith) {
String extractProductName = String.format("SELECT http_user_agent, tables FROM clusterAllReplicas('default', system.query_log) WHERE type = 'QueryStart' AND query_kind = 'Insert' AND has(databases,'%s') AND has(tables,'%s.%s') LIMIT 100", databaseName, databaseName, tableName);
public static String extractProductName(String databaseName, String tableName, String startWith) {
String extractProductName = String.format("SELECT http_user_agent, tables FROM clusterAllReplicas('default', system.query_log) WHERE type = 'QueryStart' AND query_kind = 'Insert' AND has(databases,'%s') AND has(tables,'%s.%s') and startsWith(http_user_agent, '%s') LIMIT 100", databaseName, databaseName, tableName, startWith);
Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password);
List<GenericRecord> userAgentResult = client.queryAll(extractProductName);
String userAgentValue = null;
if (!userAgentResult.isEmpty()) {
for (GenericRecord userAgent : userAgentResult) {
userAgentValue = userAgent.getString(1);
if (userAgentValue.contains(productNameStartWith))
if (userAgentValue.contains(startWith))
return userAgent.getString(1);
}
throw new RuntimeException("Can not extract product name from " + userAgentValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

public class ClickHouseSinkVersion {
public static String getVersion() {
return "0.0.1";
return "0.1.0";
}
}
Loading