diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
index 2c80c8aa9c..324d864bbf 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/pom.xml
@@ -278,6 +278,19 @@ limitations under the License.
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+ test-jar
+
+ test-jar
+
+
+
+
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
index 2326240b6d..5812046f56 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
@@ -33,9 +33,18 @@ limitations under the License.
1.19.0
8.0.27
1.2.9_flink-${flink.major.version}
+ 2.8.5
+ 2.3.9
+ 0.7.0-incubating
+
+ org.apache.flink
+ flink-table-planner_2.12
+ ${flink.version}
+ test
+
org.apache.flink
flink-cdc-e2e-utils
@@ -99,6 +108,26 @@ limitations under the License.
test-jar
test
+
+ org.apache.flink
+ flink-connector-files
+ ${flink.version}
+ test
+
+
+
+ org.apache.paimon
+ paimon-flink-${flink.major.version}
+ ${paimon.version}
+ test
+
+
+ org.apache.flink
+ flink-cdc-pipeline-connector-paimon
+ ${project.version}
+ test-jar
+ test
+
org.apache.flink
flink-connector-test-util
@@ -113,6 +142,153 @@ limitations under the License.
${testcontainers.version}
test
+
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+ test
+
+
+ org.apache.avro
+ avro
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ jdk.tools
+ jdk.tools
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+
+
+ org.apache.hadoop
+ hadoop-hdfs
+ ${hadoop.version}
+ test
+
+
+ jdk.tools
+ jdk.tools
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+
+
+
+ org.apache.hive
+ hive-metastore
+ ${hive.version}
+ test
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ org.apache.orc
+ orc-core
+
+
+ jdk.tools
+ jdk.tools
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+
+
+ org.apache.hive
+ hive-exec
+ ${hive.version}
+ test
+
+
+ log4j
+ log4j
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+ org.apache.orc
+ orc-core
+
+
+ org.pentaho
+ *
+
+
+ org.apache.calcite
+ calcite-core
+
+
+ org.apache.calcite
+ calcite-druid
+
+
+ org.apache.calcite.avatica
+ avatica
+
+
+ org.apache.calcite
+ calcite-avatica
+
+
+
@@ -231,6 +407,24 @@ limitations under the License.
${project.build.directory}/dependencies
+
+ org.apache.flink
+ flink-cdc-pipeline-connector-paimon
+ ${project.version}
+ paimon-cdc-pipeline-connector.jar
+ jar
+ ${project.build.directory}/dependencies
+
+
+
+ org.apache.flink
+ flink-shaded-hadoop-2-uber
+ 2.8.3-10.0
+ flink-shade-hadoop.jar
+ jar
+ ${project.build.directory}/dependencies
+
+
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java
new file mode 100644
index 0000000000..7485b652d0
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** End-to-end tests for mysql cdc to Paimon pipeline job. */
+@RunWith(Parameterized.class)
+public class MySqlToPaimonE2eITCase extends PipelineTestEnvironment {
+ private static final Logger LOG = LoggerFactory.getLogger(MySqlToPaimonE2eITCase.class);
+
+ public static final int TESTCASE_TIMEOUT_MILLIS_SECONDS = 60_000;
+
+ private TableEnvironment tEnv;
+
+ // ------------------------------------------------------------------------------------------
+ // MySQL Variables (we always use MySQL as the data source for easier verifying)
+ // ------------------------------------------------------------------------------------------
+ protected static final String MYSQL_TEST_USER = "mysqluser";
+ protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
+
+ @ClassRule
+ public static final MySqlContainer MYSQL =
+ (MySqlContainer)
+ new MySqlContainer(
+ MySqlVersion.V8_0) // v8 support both ARM and AMD architectures
+ .withConfigurationOverride("docker/mysql/my.cnf")
+ .withSetupSQL("docker/mysql/setup.sql")
+ .withDatabaseName("flink-test")
+ .withUsername("flinkuser")
+ .withPassword("flinkpw")
+ .withNetwork(NETWORK)
+ .withNetworkAliases("mysql")
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ protected final UniqueDatabase mysqlInventoryDatabase =
+ new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+
+ @BeforeClass
+ public static void initializeContainers() {
+ LOG.info("Starting containers...");
+ Startables.deepStart(Stream.of(MYSQL)).join();
+ LOG.info("Containers are started.");
+ }
+
+ @Before
+ public void before() throws Exception {
+ super.before();
+ mysqlInventoryDatabase.createAndInitialize();
+ tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+ }
+
+ @After
+ public void after() {
+ super.after();
+ mysqlInventoryDatabase.dropDatabase();
+ }
+
+ @Test
+ public void testSyncWholeDatabase() throws Exception {
+ String warehouse = temporaryFolder.newFolder("paimon_" + UUID.randomUUID()).toString();
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG paimon_catalog WITH ('type'='paimon', 'warehouse'='%s')",
+ warehouse));
+ tEnv.executeSql("USE CATALOG paimon_catalog");
+ String pipelineJob =
+ String.format(
+ "source:\n"
+ + " type: mysql\n"
+ + " hostname: mysql\n"
+ + " port: 3306\n"
+ + " username: %s\n"
+ + " password: %s\n"
+ + " tables: %s.\\.*\n"
+ + " server-id: 5400-5404\n"
+ + " server-time-zone: UTC\n"
+ + "\n"
+ + "sink:\n"
+ + " type: paimon\n"
+ + " catalog.properties.warehouse: %s\n"
+ + " catalog.properties.metastore: filesystem\n"
+ + " table.properties.bucket: 10\n"
+ + "\n"
+ + "pipeline:\n"
+ + " parallelism: 4",
+ MYSQL_TEST_USER,
+ MYSQL_TEST_PASSWORD,
+ mysqlInventoryDatabase.getDatabaseName(),
+ warehouse);
+ Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+ Path paimonCdcConnector = TestUtils.getResource("paimon-cdc-pipeline-connector.jar");
+ Path hadoopJar = TestUtils.getResource("flink-shade-hadoop.jar");
+ Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+ submitPipelineJob(pipelineJob, mysqlCdcJar, paimonCdcConnector, mysqlDriverJar, hadoopJar);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+ LOG.info("Pipeline job is running");
+ String query =
+ String.format(
+ "SELECT * FROM %s.%s ORDER BY id",
+ mysqlInventoryDatabase.getDatabaseName(), "products");
+ validateSinkResult(
+ query,
+ Arrays.asList(
+ Row.of(
+ 101,
+ "scooter",
+ "Small 2-wheel scooter",
+ 3.14f,
+ "red",
+ "{\"key1\": \"value1\"}",
+ "{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}"),
+ Row.of(
+ 102,
+ "car battery",
+ "12V car battery",
+ 8.1f,
+ "white",
+ "{\"key2\": \"value2\"}",
+ "{\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}"),
+ Row.of(
+ 103,
+ "12-pack drill bits",
+ "12-pack of drill bits with sizes ranging from #40 to #3",
+ 0.8f,
+ "red",
+ "{\"key3\": \"value3\"}",
+ "{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}"),
+ Row.of(
+ 104,
+ "hammer",
+ "12oz carpenter's hammer",
+ 0.75f,
+ "white",
+ "{\"key4\": \"value4\"}",
+ "{\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}"),
+ Row.of(
+ 105,
+ "hammer",
+ "14oz carpenter's hammer",
+ 0.875f,
+ "red",
+ "{\"k1\": \"v1\", \"k2\": \"v2\"}",
+ "{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}"),
+ Row.of(106, "hammer", "16oz carpenter's hammer", 1.0f, null, null, null),
+ Row.of(107, "rocks", "box of assorted rocks", 5.3f, null, null, null),
+ Row.of(
+ 108,
+ "jacket",
+ "water resistent black wind breaker",
+ 0.1f,
+ null,
+ null,
+ null),
+ Row.of(109, "spare tire", "24 inch spare tire", 22.2f, null, null, null)));
+
+ query =
+ String.format(
+ "SELECT * FROM %s.%s ORDER BY id",
+ mysqlInventoryDatabase.getDatabaseName(), "customers");
+ validateSinkResult(
+ query,
+ Arrays.asList(
+ Row.of(101, "user_1", "Shanghai", "123567891234"),
+ Row.of(102, "user_2", "Shanghai", "123567891234"),
+ Row.of(103, "user_3", "Shanghai", "123567891234"),
+ Row.of(104, "user_4", "Shanghai", "123567891234")));
+
+ LOG.info("Begin incremental reading stage.");
+ // generate binlogs
+ String mysqlJdbcUrl =
+ String.format(
+ "jdbc:mysql://%s:%s/%s",
+ MYSQL.getHost(),
+ MYSQL.getDatabasePort(),
+ mysqlInventoryDatabase.getDatabaseName());
+ try (Connection conn =
+ DriverManager.getConnection(
+ mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+ Statement stat = conn.createStatement()) {
+
+ stat.execute(
+ "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null);"); // 110
+ stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
+ stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
+
+ // modify table schema
+ stat.execute("ALTER TABLE products DROP COLUMN point_c;");
+ stat.execute("DELETE FROM products WHERE id=101;");
+
+ stat.execute(
+ "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null);"); // 111
+ stat.execute(
+ "INSERT INTO products VALUES (default,'finally', null, 2.14, null, null);"); // 112
+ } catch (SQLException e) {
+ LOG.error("Update table for CDC failed.", e);
+ throw e;
+ }
+ query =
+ String.format(
+ "SELECT * FROM %s.%s ORDER BY id",
+ mysqlInventoryDatabase.getDatabaseName(), "products");
+ validateSinkResult(
+ query,
+ Arrays.asList(
+ Row.ofKind(
+ RowKind.INSERT,
+ 102,
+ "car battery",
+ "12V car battery",
+ 8.1f,
+ "white",
+ "{\"key2\": \"value2\"}"),
+ Row.ofKind(
+ RowKind.INSERT,
+ 103,
+ "12-pack drill bits",
+ "12-pack of drill bits with sizes ranging from #40 to #3",
+ 0.8f,
+ "red",
+ "{\"key3\": \"value3\"}"),
+ Row.ofKind(
+ RowKind.INSERT,
+ 104,
+ "hammer",
+ "12oz carpenter's hammer",
+ 0.75f,
+ "white",
+ "{\"key4\": \"value4\"}"),
+ Row.ofKind(
+ RowKind.INSERT,
+ 105,
+ "hammer",
+ "14oz carpenter's hammer",
+ 0.875f,
+ "red",
+ "{\"k1\": \"v1\", \"k2\": \"v2\"}"),
+ Row.ofKind(
+ RowKind.INSERT,
+ 106,
+ "hammer",
+ "18oz carpenter hammer",
+ 1.0f,
+ null,
+ null),
+ Row.ofKind(
+ RowKind.INSERT,
+ 107,
+ "rocks",
+ "box of assorted rocks",
+ 5.1f,
+ null,
+ null),
+ Row.ofKind(
+ RowKind.INSERT,
+ 108,
+ "jacket",
+ "water resistent black wind breaker",
+ 0.1f,
+ null,
+ null),
+ Row.ofKind(
+ RowKind.INSERT,
+ 109,
+ "spare tire",
+ "24 inch spare tire",
+ 22.2f,
+ null,
+ null),
+ Row.ofKind(
+ RowKind.INSERT,
+ 110,
+ "jacket",
+ "water resistent white wind breaker",
+ 0.2f,
+ null,
+ null),
+ Row.ofKind(
+ RowKind.INSERT,
+ 111,
+ "scooter",
+ "Big 2-wheel scooter ",
+ 5.18f,
+ null,
+ null),
+ Row.ofKind(RowKind.INSERT, 112, "finally", null, 2.14f, null, null)));
+ }
+
+ private void validateSinkResult(String query, List expected) {
+ long startWaitingTimestamp = System.currentTimeMillis();
+ boolean validateSucceed = false;
+ while (!validateSucceed) {
+ try {
+ List results = new ArrayList<>();
+ tEnv.executeSql(query).collect().forEachRemaining(results::add);
+ assertEqualsInSameOrder(expected, results);
+ validateSucceed = true;
+ } catch (Throwable e) {
+ if (System.currentTimeMillis() - startWaitingTimestamp
+ > TESTCASE_TIMEOUT_MILLIS_SECONDS) {
+ throw new RuntimeException("Failed to check result with given query.");
+ }
+ }
+ }
+ }
+
+ private static void assertEqualsInSameOrder(List expected, List actual) {
+ assertTrue(expected != null && actual != null);
+ assertEquals(expected.size(), actual.size());
+ for (int i = 0; i < expected.size(); i++) {
+ assertEquals(expected.get(i), actual.get(i));
+ }
+ }
+}
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
index 65c0a202e5..7c2d55316d 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java
@@ -106,6 +106,9 @@ public void before() throws Exception {
.withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
.withExposedPorts(JOB_MANAGER_REST_PORT)
.withEnv("FLINK_PROPERTIES", flinkProperties)
+ .withFileSystemBind(
+ temporaryFolder.getRoot().getPath(),
+ temporaryFolder.getRoot().getPath())
.withLogConsumer(jobManagerConsumer);
taskManagerConsumer = new ToStringConsumer();
taskManager =
@@ -115,6 +118,9 @@ public void before() throws Exception {
.withNetwork(NETWORK)
.withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
.withEnv("FLINK_PROPERTIES", flinkProperties)
+ .withFileSystemBind(
+ temporaryFolder.getRoot().getPath(),
+ temporaryFolder.getRoot().getPath())
.dependsOn(jobManager)
.withLogConsumer(taskManagerConsumer);