From 02e663d79f295f6aeafe28b246b3115974a40c9a Mon Sep 17 00:00:00 2001
From: sunxiaojian <sunxiaojian926@163.com>
Date: Wed, 17 Apr 2024 12:08:55 +0800
Subject: [PATCH] fixed

---
 .../postgres/PostgresSyncDatabaseAction.java  |   10 +-
 .../PostgresSyncDatabaseActionFactory.java    |    2 +-
 .../postgres/PostgresActionITCaseBase.java    |   15 +
 .../PostgresSyncDatabaseActionITCase.java     | 1395 +++++++++++++++++
 .../postgres/sync_database_setup.sql          |  479 ++++++
 5 files changed, 1899 insertions(+), 2 deletions(-)
 create mode 100644 paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncDatabaseActionITCase.java
 create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_database_setup.sql

diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncDatabaseAction.java
index a3c3f3eb1602a..d6626377db4e0 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncDatabaseAction.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncDatabaseAction.java
@@ -74,6 +74,14 @@ public PostgresSyncDatabaseAction(
         this.mode = DIVIDED;
     }
 
+    public List<Pair<Identifier, String>> monitoredTables() {
+        return monitoredTables;
+    }
+
+    public List<Pair<Identifier, String>> excludedTables() {
+        return excludedTables;
+    }
+
     public PostgresSyncDatabaseAction ignoreIncompatible(boolean ignoreIncompatible) {
         this.ignoreIncompatible = ignoreIncompatible;
         return this;
@@ -179,7 +187,7 @@ protected Object buildSource() {
             return PostgresActionUtils.buildPostgresSource(
                     cdcSourceConfig,
                     // todo
-                    new String[] {},
+                    new String[] {cdcSourceConfig.get(PostgresSourceOptions.SCHEMA_NAME)},
                     new String[] {
                         PostgresActionUtils.tableList(
                                 mode,
diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncDatabaseActionFactory.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncDatabaseActionFactory.java
index bfc1f21f2bce0..243f579e77db4 100644
--- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncDatabaseActionFactory.java
+++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncDatabaseActionFactory.java
@@ -116,7 +116,7 @@ public void printHelp() {
                 "--merge-shards is default true, in this case, if some tables in different databases have the same name, "
                         + "their schemas will be merged and their records will be synchronized into one Paimon table. "
                         + "Otherwise, each table's records will be synchronized to a corresponding Paimon table, "
-                        + "and the Paimon table will be named to 'databaseName_tableName' to avoid potential name conflict.");
+                        + "and the Paimon table will be named to 'databaseName_schemaName_tableName' to avoid potential name conflict.");
         System.out.println();
 
         System.out.println(
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionITCaseBase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionITCaseBase.java
index 17010ed9a92e2..e2d77e0550783 100644
--- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionITCaseBase.java
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionITCaseBase.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.action.cdc.postgres;
 
 import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase;
+import org.apache.paimon.flink.action.cdc.mysql.MySqlSyncDatabaseAction;
 
 import com.ververica.cdc.connectors.postgres.source.PostgresConnectionPoolFactory;
 import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions;
@@ -119,6 +120,11 @@ protected String getSlotName() {
         return "paimon_" + id;
     }
 
+    protected PostgresSyncDatabaseActionBuilder syncDatabaseActionBuilder(
+            Map<String, String> postgresConfig) {
+        return new PostgresSyncDatabaseActionBuilder(postgresConfig);
+    }
+
     protected PostgresSyncTableActionBuilder syncTableActionBuilder(
             Map<String, String> postgresConfig) {
         return new PostgresSyncTableActionBuilder(postgresConfig);
@@ -132,4 +138,13 @@ public PostgresSyncTableActionBuilder(Map<String, String> postgresConfig) {
             super(PostgresSyncTableAction.class, postgresConfig);
         }
     }
+
+    /** Builder to build {@link MySqlSyncDatabaseAction} from action arguments. */
+    protected class PostgresSyncDatabaseActionBuilder
+            extends SyncDatabaseActionBuilder<PostgresSyncDatabaseAction> {
+
+        public PostgresSyncDatabaseActionBuilder(Map<String, String> postgresConfig) {
+            super(PostgresSyncDatabaseAction.class, postgresConfig);
+        }
+    }
 }
diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncDatabaseActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncDatabaseActionITCase.java
new file mode 100644
index 0000000000000..b3da8507f38ed
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresSyncDatabaseActionITCase.java
@@ -0,0 +1,1395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action.cdc.postgres;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.FileSystemCatalogOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.MultiTablesSinkMode;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.JsonSerdeUtil;
+import org.apache.paimon.utils.Pair;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.annotation.Nullable;
+
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
+import static org.apache.paimon.flink.action.MultiTablesSinkMode.DIVIDED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** IT cases for {@link PostgresSyncDatabaseAction}. */
+public class PostgresSyncDatabaseActionITCase extends PostgresActionITCaseBase {
+
+    @TempDir java.nio.file.Path tempDir;
+    private static final String DATABASE_NAME = "paimon_sync_database";
+
+    @BeforeAll
+    public static void startContainers() {
+        POSTGRES_CONTAINER.withSetupSQL("postgres/sync_database_setup.sql");
+        start();
+    }
+
+    @Override
+    protected Map<String, String> getBasicPostgresConfig() {
+        Map<String, String> postgresConfig = super.getBasicPostgresConfig();
+        postgresConfig.put("database-name", DATABASE_NAME);
+        return postgresConfig;
+    }
+
+    @Test
+    @Timeout(60)
+    public void testSchemaEvolution() throws Exception {
+        Map<String, String> postgresConfig = getBasicPostgresConfig();
+        postgresConfig.put("schema-name", "paimon_sync_schema");
+
+        PostgresSyncDatabaseAction action =
+                syncDatabaseActionBuilder(postgresConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        try (Statement statement = getStatement(DATABASE_NAME)) {
+            testSchemaEvolutionImpl(statement);
+        }
+    }
+
+    private void testSchemaEvolutionImpl(Statement statement) throws Exception {
+        FileStoreTable table1 = getFileStoreTable("t1");
+        FileStoreTable table2 = getFileStoreTable("t2");
+
+        statement.executeUpdate("SET search_path TO paimon_sync_schema");
+
+        statement.executeUpdate("INSERT INTO t1 VALUES (1, 'one')");
+        statement.executeUpdate("INSERT INTO t2 VALUES (2, 'two', 20, 200)");
+        statement.executeUpdate("INSERT INTO t1 VALUES (3, 'three')");
+        statement.executeUpdate("INSERT INTO t2 VALUES (4, 'four', 40, 400)");
+        statement.executeUpdate("INSERT INTO t3 VALUES (-1)");
+
+        RowType rowType1 =
+                RowType.of(
+                        new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)},
+                        new String[] {"k", "v1"});
+        List<String> primaryKeys1 = Collections.singletonList("k");
+        List<String> expected = Arrays.asList("+I[1, one]", "+I[3, three]");
+        waitForResult(expected, table1, rowType1, primaryKeys1);
+
+        RowType rowType2 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(10).notNull(),
+                            DataTypes.INT(),
+                            DataTypes.BIGINT()
+                        },
+                        new String[] {"k1", "k2", "v1", "v2"});
+        List<String> primaryKeys2 = Arrays.asList("k1", "k2");
+        expected = Arrays.asList("+I[2, two, 20, 200]", "+I[4, four, 40, 400]");
+        waitForResult(expected, table2, rowType2, primaryKeys2);
+
+        statement.executeUpdate("ALTER TABLE t1 ADD COLUMN v2 INT");
+        statement.executeUpdate("INSERT INTO t1 VALUES (5, 'five', 50)");
+        statement.executeUpdate("ALTER TABLE t2 ADD COLUMN v3 VARCHAR(10)");
+        statement.executeUpdate("INSERT INTO t2 VALUES (6, 'six', 60, 600, 'string_6')");
+        statement.executeUpdate("INSERT INTO t1 VALUES (7, 'seven', 70)");
+        statement.executeUpdate("INSERT INTO t2 VALUES (8, 'eight', 80, 800, 'string_8')");
+
+        rowType1 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.INT()
+                        },
+                        new String[] {"k", "v1", "v2"});
+        expected =
+                Arrays.asList(
+                        "+I[1, one, NULL]",
+                        "+I[3, three, NULL]",
+                        "+I[5, five, 50]",
+                        "+I[7, seven, 70]");
+        waitForResult(expected, table1, rowType1, primaryKeys1);
+
+        rowType2 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(10).notNull(),
+                            DataTypes.INT(),
+                            DataTypes.BIGINT(),
+                            DataTypes.VARCHAR(10)
+                        },
+                        new String[] {"k1", "k2", "v1", "v2", "v3"});
+        expected =
+                Arrays.asList(
+                        "+I[2, two, 20, 200, NULL]",
+                        "+I[4, four, 40, 400, NULL]",
+                        "+I[6, six, 60, 600, string_6]",
+                        "+I[8, eight, 80, 800, string_8]");
+        waitForResult(expected, table2, rowType2, primaryKeys2);
+
+        statement.executeUpdate("ALTER TABLE t1 MODIFY COLUMN v2 BIGINT");
+        statement.executeUpdate("INSERT INTO t1 VALUES (9, 'nine', 9000000000000)");
+        statement.executeUpdate("ALTER TABLE t2 MODIFY COLUMN v3 VARCHAR(20)");
+        statement.executeUpdate(
+                "INSERT INTO t2 VALUES (10, 'ten', 100, 1000, 'long_long_string_10')");
+
+        rowType1 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.BIGINT()
+                        },
+                        new String[] {"k", "v1", "v2"});
+        expected =
+                Arrays.asList(
+                        "+I[1, one, NULL]",
+                        "+I[3, three, NULL]",
+                        "+I[5, five, 50]",
+                        "+I[7, seven, 70]",
+                        "+I[9, nine, 9000000000000]");
+        waitForResult(expected, table1, rowType1, primaryKeys1);
+
+        rowType2 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(10).notNull(),
+                            DataTypes.INT(),
+                            DataTypes.BIGINT(),
+                            DataTypes.VARCHAR(20)
+                        },
+                        new String[] {"k1", "k2", "v1", "v2", "v3"});
+        expected =
+                Arrays.asList(
+                        "+I[2, two, 20, 200, NULL]",
+                        "+I[4, four, 40, 400, NULL]",
+                        "+I[6, six, 60, 600, string_6]",
+                        "+I[8, eight, 80, 800, string_8]",
+                        "+I[10, ten, 100, 1000, long_long_string_10]");
+        waitForResult(expected, table2, rowType2, primaryKeys2);
+    }
+
+    @Test
+    public void testSpecifiedPostgresTable() {
+        Map<String, String> postgresConfig = getBasicPostgresConfig();
+        postgresConfig.put("schema-name", "paimon_sync_schema");
+        postgresConfig.put("table-name", "my_table");
+
+        PostgresSyncDatabaseAction action = syncDatabaseActionBuilder(postgresConfig).build();
+
+        assertThatThrownBy(action::run)
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "table-name cannot be set for postgres_sync_database. "
+                                + "If you want to sync several Postgres tables into one Paimon table, "
+                                + "use postgres_sync_table instead.");
+    }
+
+    @Test
+    public void testInvalidDatabase() {
+        Map<String, String> postgresConfig = getBasicPostgresConfig();
+        postgresConfig.put("database-name", "invalid");
+
+        PostgresSyncDatabaseAction action = syncDatabaseActionBuilder(postgresConfig).build();
+
+        assertThatThrownBy(action::run)
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessage(
+                        "No tables found in Postgres database invalid, or Postgres database does not exist.");
+    }
+
+    @Test
+    @Timeout(60)
+    public void testIgnoreIncompatibleTables() throws Exception {
+        // create an incompatible table
+        createFileStoreTable(
+                "incompatible",
+                RowType.of(
+                        new DataType[] {DataTypes.STRING(), DataTypes.STRING()},
+                        new String[] {"k", "v1"}),
+                Collections.emptyList(),
+                Collections.singletonList("k"),
+                Collections.emptyMap());
+
+        // try synchronization
+        Map<String, String> PostgresConfig = getBasicPostgresConfig();
+        PostgresConfig.put("schema-name", "paimon_sync_schema_ignore_incompatible");
+
+        PostgresSyncDatabaseAction action =
+                syncDatabaseActionBuilder(PostgresConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .ignoreIncompatible(true)
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        // validate `compatible` can be synchronized
+        try (Statement statement = getStatement(DATABASE_NAME)) {
+            FileStoreTable table = getFileStoreTable("compatible");
+
+            statement.executeUpdate("SET search_path TO paimon_sync_database_ignore_incompatible");
+            statement.executeUpdate("INSERT INTO compatible VALUES (2, 'two', 20, 200)");
+            statement.executeUpdate("INSERT INTO compatible VALUES (4, 'four', 40, 400)");
+
+            RowType rowType =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.INT().notNull(),
+                                DataTypes.VARCHAR(10).notNull(),
+                                DataTypes.INT(),
+                                DataTypes.BIGINT()
+                            },
+                            new String[] {"k1", "k2", "v1", "v2"});
+            List<String> primaryKeys2 = Arrays.asList("k1", "k2");
+            List<String> expected = Arrays.asList("+I[2, two, 20, 200]", "+I[4, four, 40, 400]");
+            waitForResult(expected, table, rowType, primaryKeys2);
+        }
+    }
+
+    @Test
+    @Timeout(60)
+    public void testTableAffix() throws Exception {
+        // create table t1
+        createFileStoreTable(
+                "test_prefix_t1_test_suffix",
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.VARCHAR(10)},
+                        new String[] {"k1", "v0"}),
+                Collections.emptyList(),
+                Collections.singletonList("k1"),
+                Collections.emptyMap());
+
+        // try synchronization
+        Map<String, String> PostgresConfig = getBasicPostgresConfig();
+        PostgresConfig.put("schema-name", "paimon_sync_schema_affix");
+
+        PostgresSyncDatabaseAction action =
+                syncDatabaseActionBuilder(PostgresConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .withTablePrefix("test_prefix_")
+                        .withTableSuffix("_test_suffix")
+                        // test including check with affix
+                        .includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*")
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        try (Statement statement = getStatement(DATABASE_NAME)) {
+            testTableAffixImpl(statement);
+        }
+    }
+
+    private void testTableAffixImpl(Statement statement) throws Exception {
+        FileStoreTable table1 = getFileStoreTable("test_prefix_t1_test_suffix");
+        FileStoreTable table2 = getFileStoreTable("test_prefix_t2_test_suffix");
+
+        statement.executeUpdate("SET search_path TO paimon_sync_database_affix");
+
+        statement.executeUpdate("INSERT INTO t1 VALUES (1, 'one')");
+        statement.executeUpdate("INSERT INTO t2 VALUES (2, 'two')");
+        statement.executeUpdate("INSERT INTO t1 VALUES (3, 'three')");
+        statement.executeUpdate("INSERT INTO t2 VALUES (4, 'four')");
+
+        RowType rowType1 =
+                RowType.of(
+                        new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)},
+                        new String[] {"k1", "v0"});
+        List<String> primaryKeys1 = Collections.singletonList("k1");
+        List<String> expected = Arrays.asList("+I[1, one]", "+I[3, three]");
+        waitForResult(expected, table1, rowType1, primaryKeys1);
+
+        RowType rowType2 =
+                RowType.of(
+                        new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)},
+                        new String[] {"k2", "v0"});
+        List<String> primaryKeys2 = Collections.singletonList("k2");
+        expected = Arrays.asList("+I[2, two]", "+I[4, four]");
+        waitForResult(expected, table2, rowType2, primaryKeys2);
+
+        statement.executeUpdate("ALTER TABLE t1 ADD COLUMN v1 INT");
+        statement.executeUpdate("INSERT INTO t1 VALUES (5, 'five', 50)");
+        statement.executeUpdate("ALTER TABLE t2 ADD COLUMN v1 VARCHAR(10)");
+        statement.executeUpdate("INSERT INTO t2 VALUES (6, 'six', 's_6')");
+        statement.executeUpdate("INSERT INTO t1 VALUES (7, 'seven', 70)");
+        statement.executeUpdate("INSERT INTO t2 VALUES (8, 'eight', 's_8')");
+
+        rowType1 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.INT()
+                        },
+                        new String[] {"k1", "v0", "v1"});
+        expected =
+                Arrays.asList(
+                        "+I[1, one, NULL]",
+                        "+I[3, three, NULL]",
+                        "+I[5, five, 50]",
+                        "+I[7, seven, 70]");
+        waitForResult(expected, table1, rowType1, primaryKeys1);
+
+        rowType2 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.VARCHAR(10)
+                        },
+                        new String[] {"k2", "v0", "v1"});
+        expected =
+                Arrays.asList(
+                        "+I[2, two, NULL]",
+                        "+I[4, four, NULL]",
+                        "+I[6, six, s_6]",
+                        "+I[8, eight, s_8]");
+        waitForResult(expected, table2, rowType2, primaryKeys2);
+
+        statement.executeUpdate("ALTER TABLE t1 MODIFY COLUMN v1 BIGINT");
+        statement.executeUpdate("INSERT INTO t1 VALUES (9, 'nine', 9000000000000)");
+        statement.executeUpdate("ALTER TABLE t2 MODIFY COLUMN v1 VARCHAR(20)");
+        statement.executeUpdate("INSERT INTO t2 VALUES (10, 'ten', 'long_s_10')");
+
+        rowType1 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.BIGINT()
+                        },
+                        new String[] {"k1", "v0", "v1"});
+        expected =
+                Arrays.asList(
+                        "+I[1, one, NULL]",
+                        "+I[3, three, NULL]",
+                        "+I[5, five, 50]",
+                        "+I[7, seven, 70]",
+                        "+I[9, nine, 9000000000000]");
+        waitForResult(expected, table1, rowType1, primaryKeys1);
+
+        rowType2 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.VARCHAR(20)
+                        },
+                        new String[] {"k2", "v0", "v1"});
+        expected =
+                Arrays.asList(
+                        "+I[2, two, NULL]",
+                        "+I[4, four, NULL]",
+                        "+I[6, six, s_6]",
+                        "+I[8, eight, s_8]",
+                        "+I[10, ten, long_s_10]");
+        waitForResult(expected, table2, rowType2, primaryKeys2);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testIncludingTables() throws Exception {
+        includingAndExcludingTablesImpl(
+                "paimon_sync_schema_including",
+                "flink|paimon.+",
+                null,
+                Arrays.asList("flink", "paimon_1", "paimon_2"),
+                Collections.singletonList("ignored"));
+    }
+
+    @Test
+    @Timeout(60)
+    public void testExcludingTables() throws Exception {
+        includingAndExcludingTablesImpl(
+                "paimon_sync_schema_excluding",
+                null,
+                "flink|paimon.+",
+                Collections.singletonList("sync"),
+                Arrays.asList("flink", "paimon_1", "paimon_2"));
+    }
+
+    @Test
+    @Timeout(60)
+    public void testIncludingAndExcludingTables() throws Exception {
+        includingAndExcludingTablesImpl(
+                "paimon_sync_schema_in_excluding",
+                "flink|paimon.+",
+                "paimon_1",
+                Arrays.asList("flink", "paimon_2"),
+                Arrays.asList("paimon_1", "test"));
+    }
+
+    private void includingAndExcludingTablesImpl(
+            String schemaName,
+            @Nullable String includingTables,
+            @Nullable String excludingTables,
+            List<String> existedTables,
+            List<String> notExistedTables)
+            throws Exception {
+        // try synchronization
+        Map<String, String> postgresConfig = getBasicPostgresConfig();
+        postgresConfig.put("schema-name", schemaName);
+
+        PostgresSyncDatabaseAction action =
+                syncDatabaseActionBuilder(postgresConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .includingTables(includingTables)
+                        .excludingTables(excludingTables)
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        // check paimon tables
+        assertExactlyExistTables(existedTables);
+        assertTableNotExists(notExistedTables);
+    }
+
+    @Test
+    @Timeout(60)
+    public void testIgnoreCase() throws Exception {
+        Map<String, String> postgresConfig = getBasicPostgresConfig();
+        String schemaName = "paimon_ignore_CASE";
+        postgresConfig.put("schema-name", schemaName);
+
+        PostgresSyncDatabaseAction action =
+                syncDatabaseActionBuilder(postgresConfig)
+                        .withCatalogConfig(
+                                Collections.singletonMap(
+                                        FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false"))
+                        .withTableConfig(getBasicTableConfig())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        // check table schema
+        FileStoreTable table = getFileStoreTable("t");
+        assertThat(JsonSerdeUtil.toFlatJson(table.schema().fields()))
+                .isEqualTo(
+                        "[{\"id\":0,\"name\":\"k\",\"type\":\"INT NOT NULL\",\"description\":\"\"},"
+                                + "{\"id\":1,\"name\":\"uppercase_v0\",\"type\":\"VARCHAR(20)\",\"description\":\"\"}]");
+
+        // check sync schema changes and records
+        try (Statement statement = getStatement(DATABASE_NAME)) {
+            statement.executeUpdate("SET search_path TO paimon_ignore_CASE");
+            statement.executeUpdate("INSERT INTO T VALUES (1, 'Hi')");
+            RowType rowType1 =
+                    RowType.of(
+                            new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(20)},
+                            new String[] {"k", "uppercase_v0"});
+            waitForResult(
+                    Collections.singletonList("+I[1, Hi]"),
+                    table,
+                    rowType1,
+                    Collections.singletonList("k"));
+
+            statement.executeUpdate("ALTER TABLE T MODIFY COLUMN UPPERCASE_V0 VARCHAR(30)");
+            statement.executeUpdate("INSERT INTO T VALUES (2, 'Paimon')");
+            RowType rowType2 =
+                    RowType.of(
+                            new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(30)},
+                            new String[] {"k", "uppercase_v0"});
+            waitForResult(
+                    Arrays.asList("+I[1, Hi]", "+I[2, Paimon]"),
+                    table,
+                    rowType2,
+                    Collections.singletonList("k"));
+
+            statement.executeUpdate("ALTER TABLE T ADD COLUMN UPPERCASE_V1 DOUBLE");
+            statement.executeUpdate("INSERT INTO T VALUES (3, 'Test', 0.5)");
+            RowType rowType3 =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.INT().notNull(), DataTypes.VARCHAR(30), DataTypes.DOUBLE()
+                            },
+                            new String[] {"k", "uppercase_v0", "uppercase_v1"});
+            waitForResult(
+                    Arrays.asList("+I[1, Hi, NULL]", "+I[2, Paimon, NULL]", "+I[3, Test, 0.5]"),
+                    table,
+                    rowType3,
+                    Collections.singletonList("k"));
+        }
+    }
+
+    @Test
+    @Timeout(600)
+    public void testNewlyAddedTables() throws Exception {
+        testNewlyAddedTable(1, true, false, "paimon_sync_schema_newly_added_tables");
+    }
+
+    @Test
+    @Timeout(600)
+    public void testNewlyAddedTableSingleTable() throws Exception {
+        testNewlyAddedTable(1, false, false, "paimon_sync_schema_newly_added_tables_1");
+    }
+
+    @Test
+    @Timeout(600)
+    public void testNewlyAddedTableMultipleTables() throws Exception {
+        testNewlyAddedTable(3, false, false, "paimon_sync_schema_newly_added_tables_2");
+    }
+
+    @Test
+    @Timeout(600)
+    public void testNewlyAddedTableSchemaChange() throws Exception {
+        testNewlyAddedTable(1, false, true, "paimon_sync_database_schema_added_tables_3");
+    }
+
+    @Test
+    @Timeout(600)
+    public void testNewlyAddedTableSingleTableWithSavepoint() throws Exception {
+        testNewlyAddedTable(1, true, true, "paimon_sync_schema_newly_added_tables_4");
+    }
+
+    @Test
+    @Timeout(120)
+    public void testAddIgnoredTable() throws Exception {
+        String postgresSchema = "paimon_sync_schema_add_ignored_table";
+        Map<String, String> postgresConfig = getBasicPostgresConfig();
+        postgresConfig.put("schema-name", postgresSchema);
+
+        PostgresSyncDatabaseAction action =
+                syncDatabaseActionBuilder(postgresConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .includingTables("t.+")
+                        .excludingTables(".*a$")
+                        .withMode(COMBINED.configString())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        try (Statement statement = getStatement(DATABASE_NAME)) {
+            FileStoreTable table1 = getFileStoreTable("t1");
+
+            statement.executeUpdate("SET search_path TO " + postgresSchema);
+            statement.executeUpdate("INSERT INTO t1 VALUES (1, 'one')");
+            statement.executeUpdate("INSERT INTO a VALUES (1, 'one')");
+
+            // make sure the job steps into incremental phase
+            RowType rowType =
+                    RowType.of(
+                            new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)},
+                            new String[] {"k", "v1"});
+            List<String> primaryKeys = Collections.singletonList("k");
+            waitForResult(Collections.singletonList("+I[1, one]"), table1, rowType, primaryKeys);
+
+            // create new tables at runtime
+            // synchronized table: t2, t22
+            statement.executeUpdate("CREATE TABLE t2 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
+            statement.executeUpdate("INSERT INTO t2 VALUES (1, 'Hi')");
+
+            statement.executeUpdate("CREATE TABLE t22 LIKE t2");
+            statement.executeUpdate("INSERT INTO t22 VALUES (1, 'Hello')");
+
+            // not synchronized tables: ta, t3, t4
+            statement.executeUpdate("CREATE TABLE ta (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
+            statement.executeUpdate("INSERT INTO ta VALUES (1, 'Apache')");
+            statement.executeUpdate("CREATE TABLE t3 (k INT, v1 VARCHAR(10))");
+            statement.executeUpdate("INSERT INTO t3 VALUES (1, 'Paimon')");
+            statement.executeUpdate("CREATE TABLE t4 SELECT * FROM t2");
+
+            statement.executeUpdate("INSERT INTO t1 VALUES (2, 'two')");
+            waitForResult(Arrays.asList("+I[1, one]", "+I[2, two]"), table1, rowType, primaryKeys);
+
+            // check tables
+            assertExactlyExistTables("t1", "t2", "t22");
+            assertTableNotExists("a", "ta", "t3", "t4");
+
+            FileStoreTable newTable = getFileStoreTable("t2");
+            waitForResult(Collections.singletonList("+I[1, Hi]"), newTable, rowType, primaryKeys);
+
+            newTable = getFileStoreTable("t22");
+            waitForResult(
+                    Collections.singletonList("+I[1, Hello]"), newTable, rowType, primaryKeys);
+        }
+    }
+
+    public void testNewlyAddedTable(
+            int numOfNewlyAddedTables,
+            boolean testSavepointRecovery,
+            boolean testSchemaChange,
+            String schemaName)
+            throws Exception {
+        JobClient client =
+                buildSyncDatabaseActionWithNewlyAddedTables(schemaName, testSchemaChange);
+        waitJobRunning(client);
+
+        try (Statement statement = getStatement(DATABASE_NAME)) {
+            testNewlyAddedTableImpl(
+                    client,
+                    statement,
+                    numOfNewlyAddedTables,
+                    testSavepointRecovery,
+                    testSchemaChange,
+                    schemaName);
+        }
+    }
+
+    private void testNewlyAddedTableImpl(
+            JobClient client,
+            Statement statement,
+            int newlyAddedTableCount,
+            boolean testSavepointRecovery,
+            boolean testSchemaChange,
+            String schemaName)
+            throws Exception {
+        FileStoreTable table1 = getFileStoreTable("t1");
+        FileStoreTable table2 = getFileStoreTable("t2");
+
+        statement.executeUpdate("SET search_path TO " + schemaName);
+
+        statement.executeUpdate("INSERT INTO t1 VALUES (1, 'one')");
+        statement.executeUpdate("INSERT INTO t2 VALUES (2, 'two', 20, 200)");
+        statement.executeUpdate("INSERT INTO t1 VALUES (3, 'three')");
+        statement.executeUpdate("INSERT INTO t2 VALUES (4, 'four', 40, 400)");
+        RowType rowType1 =
+                RowType.of(
+                        new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)},
+                        new String[] {"k", "v1"});
+        List<String> primaryKeys1 = Collections.singletonList("k");
+        List<String> expected = Arrays.asList("+I[1, one]", "+I[3, three]");
+        waitForResult(expected, table1, rowType1, primaryKeys1);
+
+        RowType rowType2 =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(),
+                            DataTypes.VARCHAR(10).notNull(),
+                            DataTypes.INT(),
+                            DataTypes.BIGINT()
+                        },
+                        new String[] {"k1", "k2", "v1", "v2"});
+        List<String> primaryKeys2 = Arrays.asList("k1", "k2");
+        expected = Arrays.asList("+I[2, two, 20, 200]", "+I[4, four, 40, 400]");
+        waitForResult(expected, table2, rowType2, primaryKeys2);
+
+        // Create new tables at runtime. The Flink job is guaranteed to at incremental
+        //    sync phase, because the newly added table will not be captured in snapshot
+        //    phase.
+        Map<String, List<Tuple2<Integer, String>>> recordsMap = new HashMap<>();
+        List<String> newTablePrimaryKeys = Collections.singletonList("k");
+        RowType newTableRowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)},
+                        new String[] {"k", "v1"});
+        int newTableCount = 0;
+        String newTableName = getNewTableName(newTableCount);
+
+        createNewTable(statement, newTableName);
+        statement.executeUpdate(
+                String.format("INSERT INTO `%s`.`t2` VALUES (8, 'eight', 80, 800)", schemaName));
+        List<Tuple2<Integer, String>> newTableRecords = getNewTableRecords();
+        recordsMap.put(newTableName, newTableRecords);
+        List<String> newTableExpected = getNewTableExpected(newTableRecords);
+        insertRecordsIntoNewTable(statement, schemaName, newTableName, newTableRecords);
+
+        // suspend the job and restart from savepoint
+        if (testSavepointRecovery) {
+            String savepoint =
+                    client.stopWithSavepoint(
+                                    false,
+                                    tempDir.toUri().toString(),
+                                    SavepointFormatType.CANONICAL)
+                            .join();
+            assertThat(savepoint).isNotBlank();
+
+            client =
+                    buildSyncDatabaseActionWithNewlyAddedTables(
+                            savepoint, schemaName, testSchemaChange);
+            waitJobRunning(client);
+        }
+
+        // wait until table t2 contains the updated record, and then check
+        //     for existence of first newly added table
+        expected =
+                Arrays.asList(
+                        "+I[2, two, 20, 200]", "+I[4, four, 40, 400]", "+I[8, eight, 80, 800]");
+        waitForResult(expected, table2, rowType2, primaryKeys2);
+
+        FileStoreTable newTable = getFileStoreTable(newTableName);
+        waitForResult(newTableExpected, newTable, newTableRowType, newTablePrimaryKeys);
+
+        for (newTableCount = 1; newTableCount < newlyAddedTableCount; ++newTableCount) {
+            // create new table
+            newTableName = getNewTableName(newTableCount);
+            createNewTable(statement, newTableName);
+
+            Thread.sleep(5000L);
+
+            // insert records
+            newTableRecords = getNewTableRecords();
+            recordsMap.put(newTableName, newTableRecords);
+            insertRecordsIntoNewTable(statement, schemaName, newTableName, newTableRecords);
+            newTable = getFileStoreTable(newTableName);
+            newTableExpected = getNewTableExpected(newTableRecords);
+            waitForResult(newTableExpected, newTable, newTableRowType, newTablePrimaryKeys);
+        }
+
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        // pick a random newly added table and insert records
+        int pick = random.nextInt(newlyAddedTableCount);
+        String tableName = getNewTableName(pick);
+        List<Tuple2<Integer, String>> records = recordsMap.get(tableName);
+        records.add(Tuple2.of(80, "eighty"));
+        newTable = getFileStoreTable(tableName);
+        newTableExpected = getNewTableExpected(records);
+        statement.executeUpdate(
+                String.format(
+                        "INSERT INTO `%s`.`%s` VALUES (80, 'eighty')", schemaName, tableName));
+
+        waitForResult(newTableExpected, newTable, newTableRowType, newTablePrimaryKeys);
+
+        // test schema change
+        if (testSchemaChange) {
+            pick = random.nextInt(newlyAddedTableCount);
+            tableName = getNewTableName(pick);
+            records = recordsMap.get(tableName);
+
+            statement.executeUpdate(
+                    String.format(
+                            "ALTER TABLE `%s`.`%s` ADD COLUMN v2 INT", schemaName, tableName));
+            statement.executeUpdate(
+                    String.format(
+                            "INSERT INTO `%s`.`%s` VALUES (100, 'hundred', 10000)",
+                            schemaName, tableName));
+
+            List<String> expectedRecords =
+                    records.stream()
+                            .map(tuple -> String.format("+I[%d, %s, NULL]", tuple.f0, tuple.f1))
+                            .collect(Collectors.toList());
+            expectedRecords.add("+I[100, hundred, 10000]");
+
+            newTable = getFileStoreTable(tableName);
+            RowType rowType =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.INT()
+                            },
+                            new String[] {"k", "v1", "v2"});
+            waitForResult(expectedRecords, newTable, rowType, newTablePrimaryKeys);
+
+            // test that catalog loader works
+            assertThat(getFileStoreTable(tableName).options())
+                    .containsEntry("alter-table-test", "true");
+        }
+    }
+
+    private List<String> getNewTableExpected(List<Tuple2<Integer, String>> newTableRecords) {
+        return newTableRecords.stream()
+                .map(tuple -> String.format("+I[%d, %s]", tuple.f0, tuple.f1))
+                .collect(Collectors.toList());
+    }
+
+    private List<Tuple2<Integer, String>> getNewTableRecords() {
+        List<Tuple2<Integer, String>> records = new LinkedList<>();
+        int count = ThreadLocalRandom.current().nextInt(10) + 1;
+        for (int i = 0; i < count; i++) {
+            records.add(Tuple2.of(i, "varchar_" + i));
+        }
+        return records;
+    }
+
+    private void insertRecordsIntoNewTable(
+            Statement statement,
+            String databaseName,
+            String newTableName,
+            List<Tuple2<Integer, String>> newTableRecords)
+            throws SQLException {
+        String sql =
+                String.format(
+                        "INSERT INTO `%s`.`%s` VALUES %s",
+                        databaseName,
+                        newTableName,
+                        newTableRecords.stream()
+                                .map(tuple -> String.format("(%d, '%s')", tuple.f0, tuple.f1))
+                                .collect(Collectors.joining(", ")));
+        statement.executeUpdate(sql);
+    }
+
+    private String getNewTableName(int newTableCount) {
+        return "t_new_table_" + newTableCount;
+    }
+
+    private void createNewTable(Statement statement, String newTableName) throws SQLException {
+        statement.executeUpdate(
+                String.format(
+                        "CREATE TABLE %s (k INT, v1 VARCHAR(10), PRIMARY KEY (k))", newTableName));
+    }
+
+    private JobClient buildSyncDatabaseActionWithNewlyAddedTables(
+            String schemaName, boolean testSchemaChange) throws Exception {
+        return buildSyncDatabaseActionWithNewlyAddedTables(null, schemaName, testSchemaChange);
+    }
+
+    private JobClient buildSyncDatabaseActionWithNewlyAddedTables(
+            String savepointPath, String schemaName, boolean testSchemaChange) throws Exception {
+
+        Map<String, String> postgresConfig = getBasicPostgresConfig();
+        postgresConfig.put("schema-name", schemaName);
+        postgresConfig.put("scan.incremental.snapshot.chunk.size", "1");
+
+        Map<String, String> catalogConfig =
+                testSchemaChange
+                        ? Collections.singletonMap(
+                                CatalogOptions.METASTORE.key(), "test-alter-table")
+                        : Collections.emptyMap();
+
+        PostgresSyncDatabaseAction action =
+                syncDatabaseActionBuilder(postgresConfig)
+                        .withCatalogConfig(catalogConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .includingTables("t.+")
+                        .withMode(COMBINED.configString())
+                        .build();
+        action.withStreamExecutionEnvironment(env).build();
+
+        if (Objects.nonNull(savepointPath)) {
+            StreamGraph streamGraph = env.getStreamGraph();
+            JobGraph jobGraph = streamGraph.getJobGraph();
+            jobGraph.setSavepointRestoreSettings(
+                    SavepointRestoreSettings.forPath(savepointPath, true));
+            return env.executeAsync(streamGraph);
+        }
+        return env.executeAsync();
+    }
+
+    @Test
+    @Timeout(240)
+    public void testSyncManyTableWithLimitedMemory() throws Exception {
+        String schemaName = "many_table_sync_test";
+        int newTableCount = 100;
+        int recordsCount = 100;
+        List<Tuple2<Integer, String>> newTableRecords = new ArrayList<>();
+        List<String> expectedRecords = new ArrayList<>();
+
+        for (int i = 0; i < recordsCount; i++) {
+            newTableRecords.add(Tuple2.of(i, "string_" + i));
+            expectedRecords.add(String.format("+I[%d, %s]", i, "string_" + i));
+        }
+
+        Map<String, String> postgresConfig = getBasicPostgresConfig();
+        postgresConfig.put("schema-name", schemaName);
+        postgresConfig.put("scan.incremental.snapshot.chunk.size", "1");
+
+        Map<String, String> tableConfig = getBasicTableConfig();
+        tableConfig.put("sink.parallelism", "1");
+        tableConfig.put(CoreOptions.WRITE_BUFFER_SIZE.key(), "4 mb");
+
+        PostgresSyncDatabaseAction action =
+                syncDatabaseActionBuilder(postgresConfig)
+                        .withTableConfig(tableConfig)
+                        .withMode(COMBINED.configString())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        try (Statement statement = getStatement(DATABASE_NAME)) {
+            statement.executeUpdate("SET search_path TO " + schemaName);
+            // wait checkpointing to step into incremental phase
+            Thread.sleep(2_000);
+
+            List<String> tables = new ArrayList<>();
+            tables.add("a");
+            for (int i = 0; i < newTableCount; i++) {
+                tables.add("t" + i);
+                Thread thread = new Thread(new SyncNewTableJob(i, statement, newTableRecords));
+                thread.start();
+            }
+
+            waitingTables(tables);
+
+            RowType newTableRowType =
+                    RowType.of(
+                            new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)},
+                            new String[] {"k", "v1"});
+            List<String> newTablePrimaryKeys = Collections.singletonList("k");
+            for (int i = 0; i < newTableCount; i++) {
+                FileStoreTable newTable = getFileStoreTable("t" + i);
+                waitForResult(expectedRecords, newTable, newTableRowType, newTablePrimaryKeys);
+            }
+        }
+    }
+
+    @Test
+    @Timeout(60)
+    @Disabled
+    public void testSyncMultipleShards() throws Exception {
+        Map<String, String> postgresConfig = getBasicPostgresConfig();
+
+        // test table list
+        postgresConfig.put(
+                "schema-name",
+                ThreadLocalRandom.current().nextBoolean()
+                        ? "schema_shard_.*"
+                        : "schema_shard_1|schema_shard_2");
+
+        MultiTablesSinkMode mode = ThreadLocalRandom.current().nextBoolean() ? DIVIDED : COMBINED;
+        PostgresSyncDatabaseAction action =
+                syncDatabaseActionBuilder(postgresConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .withMode(mode.configString())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        try (Statement statement = getStatement(DATABASE_NAME)) {
+            // test insert into t1
+            statement.executeUpdate("INSERT INTO schema_shard_1.t1 VALUES (1, 'db1_1')");
+            statement.executeUpdate("INSERT INTO schema_shard_1.t1 VALUES (2, 'db1_2')");
+
+            statement.executeUpdate("INSERT INTO schema_shard_2.t1 VALUES (3, 'db2_3', 300)");
+            statement.executeUpdate("INSERT INTO schema_shard_2.t1 VALUES (4, 'db2_4', 400)");
+
+            FileStoreTable table = getFileStoreTable("t1");
+            RowType rowType =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.INT().notNull(), DataTypes.VARCHAR(20), DataTypes.BIGINT()
+                            },
+                            new String[] {"k", "v1", "v2"});
+            waitForResult(
+                    Arrays.asList(
+                            "+I[1, db1_1, NULL]",
+                            "+I[2, db1_2, NULL]",
+                            "+I[3, db2_3, 300]",
+                            "+I[4, db2_4, 400]"),
+                    table,
+                    rowType,
+                    Collections.singletonList("k"));
+
+            // test schema evolution of t2
+            statement.executeUpdate("ALTER TABLE schema_shard_1.t2 ADD COLUMN v2 INT");
+            statement.executeUpdate("ALTER TABLE schema_shard_2.t2 ADD COLUMN v3 VARCHAR(10)");
+            statement.executeUpdate("INSERT INTO schema_shard_1.t2 VALUES (1, 1.1, 1)");
+            statement.executeUpdate("INSERT INTO schema_shard_1.t2 VALUES (2, 2.2, 2)");
+            statement.executeUpdate("INSERT INTO schema_shard_2.t2 VALUES (3, 3.3, 'db2_3')");
+            statement.executeUpdate("INSERT INTO schema_shard_2.t2 VALUES (4, 4.4, 'db2_4')");
+            table = getFileStoreTable("t2");
+            rowType =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.BIGINT().notNull(),
+                                DataTypes.DOUBLE(),
+                                DataTypes.INT(),
+                                DataTypes.VARCHAR(10)
+                            },
+                            new String[] {"k", "v1", "v2", "v3"});
+            waitForResult(
+                    Arrays.asList(
+                            "+I[1, 1.1, 1, NULL]",
+                            "+I[2, 2.2, 2, NULL]",
+                            "+I[3, 3.3, NULL, db2_3]",
+                            "+I[4, 4.4, NULL, db2_4]"),
+                    table,
+                    rowType,
+                    Collections.singletonList("k"));
+
+            // test that database_shard_2.t3 won't be synchronized
+            statement.executeUpdate(
+                    "INSERT INTO schema_shard_2.t3 VALUES (1, 'db2_1'), (2, 'db2_2')");
+            statement.executeUpdate(
+                    "INSERT INTO schema_shard_1.t3 VALUES (3, 'db1_3'), (4, 'db1_4')");
+            table = getFileStoreTable("t3");
+            rowType =
+                    RowType.of(
+                            new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)},
+                            new String[] {"k", "v1"});
+            waitForResult(
+                    Arrays.asList("+I[3, db1_3]", "+I[4, db1_4]"),
+                    table,
+                    rowType,
+                    Collections.singletonList("k"));
+
+            // test newly created table
+            if (mode == COMBINED) {
+                statement.executeUpdate(
+                        "CREATE TABLE schema_shard_1.t4 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
+                statement.executeUpdate("INSERT INTO schema_shard_1.t4 VALUES (1, 'db1_1')");
+
+                statement.executeUpdate(
+                        "CREATE TABLE schema_shard_2.t4 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
+                statement.executeUpdate("INSERT INTO schema_shard_2.t4 VALUES (2, 'db2_2')");
+
+                waitingTables("t4");
+
+                table = getFileStoreTable("t4");
+                rowType =
+                        RowType.of(
+                                new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)},
+                                new String[] {"k", "v1"});
+                waitForResult(
+                        Arrays.asList("+I[1, db1_1]", "+I[2, db2_2]"),
+                        table,
+                        rowType,
+                        Collections.singletonList("k"));
+            }
+        }
+    }
+
+    @Test
+    @Timeout(60)
+    public void testSyncMultipleShardsWithoutMerging() throws Exception {
+        Map<String, String> PostgresConfig = getBasicPostgresConfig();
+        PostgresConfig.put("schema-name", "without_merging_shard_.*");
+
+        MultiTablesSinkMode mode = ThreadLocalRandom.current().nextBoolean() ? DIVIDED : COMBINED;
+        PostgresSyncDatabaseAction action =
+                syncDatabaseActionBuilder(PostgresConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .mergeShards(false)
+                        .withMode(mode.configString())
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        try (Statement statement = getStatement(DATABASE_NAME)) {
+            Thread.sleep(5_000);
+
+            assertExactlyExistTables(
+                    "paimon_sync_database_without_merging_shard_1_t1",
+                    "paimon_sync_database_without_merging_shard_1_t2",
+                    "paimon_sync_database_without_merging_shard_2_t1");
+
+            // test insert into without_merging_shard_1.t1
+            statement.executeUpdate(
+                    "INSERT INTO without_merging_shard_1.t1 VALUES (1, 'db1_1'), (2, 'db1_2')");
+            FileStoreTable table =
+                    getFileStoreTable("paimon_sync_database_without_merging_shard_1_t1");
+            RowType rowType =
+                    RowType.of(
+                            new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)},
+                            new String[] {"k", "v1"});
+            waitForResult(
+                    Arrays.asList("+I[1, db1_1]", "+I[2, db1_2]"),
+                    table,
+                    rowType,
+                    Collections.singletonList("k"));
+
+            // test insert into without_merging_shard_2.t1
+            statement.executeUpdate(
+                    "INSERT INTO without_merging_shard_2.t1 VALUES (3, 'db2_3', 300), (4, 'db2_4', 400)");
+            table = getFileStoreTable("paimon_sync_database_without_merging_shard_2_t1");
+            rowType =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.INT().notNull(), DataTypes.VARCHAR(20), DataTypes.BIGINT()
+                            },
+                            new String[] {"k", "v1", "v2"});
+            waitForResult(
+                    Arrays.asList("+I[3, db2_3, 300]", "+I[4, db2_4, 400]"),
+                    table,
+                    rowType,
+                    Collections.singletonList("k"));
+
+            // test schema evolution of without_merging_shard_1.t2
+            statement.executeUpdate("ALTER TABLE without_merging_shard_1.t2 ADD COLUMN v2 DOUBLE");
+            statement.executeUpdate(
+                    "INSERT INTO without_merging_shard_1.t2 VALUES (1, 'Apache', 1.1)");
+            statement.executeUpdate(
+                    "INSERT INTO without_merging_shard_1.t2 VALUES (2, 'Paimon', 2.2)");
+            table = getFileStoreTable("without_merging_shard_1_t2");
+            rowType =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.DOUBLE()
+                            },
+                            new String[] {"k", "v1", "v2"});
+            waitForResult(
+                    Arrays.asList("+I[1, Apache, 1.1]", "+I[2, Paimon, 2.2]"),
+                    table,
+                    rowType,
+                    Collections.singletonList("k"));
+
+            // test newly created table
+            if (mode == COMBINED) {
+                statement.executeUpdate(
+                        "CREATE TABLE without_merging_shard_1.t3 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
+                statement.executeUpdate(
+                        "INSERT INTO without_merging_shard_1.t3 VALUES (1, 'test')");
+
+                statement.executeUpdate(
+                        "CREATE TABLE without_merging_shard_2.t3 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
+                statement.executeUpdate(
+                        "INSERT INTO without_merging_shard_2.t3 VALUES (2, 'test')");
+
+                waitingTables(
+                        "paimon_sync_database_without_merging_shard_1_t3",
+                        "paimon_sync_database_without_merging_shard_2_t3");
+
+                table = getFileStoreTable("paimon_sync_database_without_merging_shard_1_t3");
+                rowType =
+                        RowType.of(
+                                new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)},
+                                new String[] {"k", "v1"});
+                waitForResult(
+                        Collections.singletonList("+I[1, test]"),
+                        table,
+                        rowType,
+                        Collections.singletonList("k"));
+
+                table = getFileStoreTable("paimon_sync_database_without_merging_shard_2_t3");
+                waitForResult(
+                        Collections.singletonList("+I[2, test]"),
+                        table,
+                        rowType,
+                        Collections.singletonList("k"));
+            }
+        }
+    }
+
+    @Test
+    public void testMonitoredAndExcludedTablesWithMering() throws Exception {
+        // create an incompatible table named t2
+        createFileStoreTable(
+                "t2",
+                RowType.of(
+                        new DataType[] {DataTypes.STRING(), DataTypes.STRING()},
+                        new String[] {"k", "v1"}),
+                Collections.emptyList(),
+                Collections.singletonList("k"),
+                Collections.emptyMap());
+
+        Map<String, String> PostgresConfig = getBasicPostgresConfig();
+        PostgresConfig.put("schema-name", "monitored_and_excluded_shard_.*");
+
+        PostgresSyncDatabaseAction action =
+                syncDatabaseActionBuilder(PostgresConfig)
+                        .ignoreIncompatible(true)
+                        .withMode(COMBINED.configString())
+                        .build();
+        action.build();
+
+        assertThat(action.monitoredTables())
+                .containsOnly(
+                        Pair.of(
+                                Identifier.create(DATABASE_NAME, "t1"),
+                                "monitored_and_excluded_shard_1"),
+                        Pair.of(
+                                Identifier.create(DATABASE_NAME, "t3"),
+                                "monitored_and_excluded_shard_1"),
+                        Pair.of(
+                                Identifier.create(DATABASE_NAME, "t1"),
+                                "monitored_and_excluded_shard_2"));
+
+        assertThat(action.excludedTables())
+                .containsOnly(
+                        // t2 is merged, so all shards will be excluded
+                        Pair.of(
+                                Identifier.create(DATABASE_NAME, "t2"),
+                                "monitored_and_excluded_shard_1"),
+                        Pair.of(
+                                Identifier.create(DATABASE_NAME, "t2"),
+                                "monitored_and_excluded_shard_2"),
+                        // non pk table
+                        Pair.of(
+                                Identifier.create(DATABASE_NAME, "t3"),
+                                "monitored_and_excluded_shard_2"));
+    }
+
+    @Test
+    @Timeout(60)
+    public void testNewlyAddedTablesOptionsChange() throws Exception {
+        String schemaName = "newly_added_tables_option_schange";
+        try (Statement statement = getStatement(DATABASE_NAME)) {
+            statement.execute("SET search_path TO " + schemaName);
+            statement.executeUpdate("INSERT INTO t1 VALUES (1, 'one')");
+            statement.executeUpdate("INSERT INTO t1 VALUES (3, 'three')");
+        }
+
+        Map<String, String> postgresConfig = getBasicPostgresConfig();
+        postgresConfig.put("schema-name", schemaName);
+        Map<String, String> tableConfig = new HashMap<>();
+        tableConfig.put("bucket", "1");
+        tableConfig.put("sink.parallelism", "1");
+
+        PostgresSyncDatabaseAction action1 =
+                syncDatabaseActionBuilder(postgresConfig)
+                        .withTableConfig(tableConfig)
+                        .withMode(COMBINED.configString())
+                        .build();
+
+        JobClient jobClient = runActionWithDefaultEnv(action1);
+
+        waitingTables("t1");
+        jobClient.cancel();
+
+        tableConfig.put("sink.savepoint.auto-tag", "true");
+        tableConfig.put("tag.num-retained-max", "5");
+        tableConfig.put("tag.automatic-creation", "process-time");
+        tableConfig.put("tag.creation-period", "hourly");
+        tableConfig.put("tag.creation-delay", "600000");
+        tableConfig.put("snapshot.time-retained", "1h");
+        tableConfig.put("snapshot.num-retained.min", "5");
+        tableConfig.put("snapshot.num-retained.max", "10");
+        tableConfig.put("changelog-producer", "input");
+
+        try (Statement statement = getStatement(DATABASE_NAME)) {
+            statement.execute("SET search_path TO " + "newly_added_tables_option_schange");
+            statement.executeUpdate("CREATE TABLE t2 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
+            statement.executeUpdate("INSERT INTO t2 VALUES (1, 'Hi')");
+        }
+
+        PostgresSyncDatabaseAction action2 =
+                syncDatabaseActionBuilder(postgresConfig).withTableConfig(tableConfig).build();
+        runActionWithDefaultEnv(action2);
+        waitingTables("t2");
+
+        Map<String, String> tableOptions = getFileStoreTable("t2").options();
+        assertThat(tableOptions).containsAllEntriesOf(tableConfig).containsKey("path");
+    }
+
+    @Test
+    public void testCatalogAndTableConfig() {
+        PostgresSyncDatabaseAction action =
+                syncDatabaseActionBuilder(getBasicPostgresConfig())
+                        .withCatalogConfig(Collections.singletonMap("catalog-key", "catalog-value"))
+                        .withTableConfig(Collections.singletonMap("table-key", "table-value"))
+                        .build();
+
+        assertThat(action.catalogConfig()).containsEntry("catalog-key", "catalog-value");
+        assertThat(action.tableConfig())
+                .containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
+    }
+
+    @Test
+    @Timeout(60)
+    public void testMetadataColumns() throws Exception {
+        Map<String, String> postgresConfig = getBasicPostgresConfig();
+        String schemaName = "metadata";
+        postgresConfig.put("schema-name", schemaName);
+
+        MultiTablesSinkMode mode = ThreadLocalRandom.current().nextBoolean() ? DIVIDED : COMBINED;
+        PostgresSyncDatabaseAction action =
+                syncDatabaseActionBuilder(postgresConfig)
+                        .withTableConfig(getBasicTableConfig())
+                        .withMode(mode.configString())
+                        .withMetadataColumn(Arrays.asList("table_name", "database_name"))
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        try (Statement statement = getStatement(DATABASE_NAME)) {
+            statement.executeUpdate("INSERT INTO metadata.t1 VALUES (1, 'db1_1')");
+            statement.executeUpdate("INSERT INTO metadata.t1 VALUES (2, 'db1_2')");
+
+            statement.executeUpdate("INSERT INTO metadata.t1 VALUES (3, 'db2_3')");
+            statement.executeUpdate("INSERT INTO metadata.t1 VALUES (4, 'db2_4')");
+
+            FileStoreTable table = getFileStoreTable("t1");
+            RowType rowType =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.INT().notNull(),
+                                DataTypes.VARCHAR(10),
+                                DataTypes.STRING().notNull(),
+                                DataTypes.STRING().notNull()
+                            },
+                            new String[] {"k", "v1", "table_name", "database_name"});
+            waitForResult(
+                    Arrays.asList(
+                            "+I[1, db1_1, t1, metadata]",
+                            "+I[2, db1_2, t1, metadata]",
+                            "+I[3, db2_3, t1, metadata]",
+                            "+I[4, db2_4, t1, metadata]"),
+                    table,
+                    rowType,
+                    Collections.singletonList("k"));
+
+            statement.executeUpdate("INSERT INTO metadata.t2 VALUES (1, 'db1_1')");
+            statement.executeUpdate("INSERT INTO metadata.t2 VALUES (2, 'db1_2')");
+            statement.executeUpdate("INSERT INTO metadata.t2 VALUES (3, 'db1_3')");
+            statement.executeUpdate("INSERT INTO metadata.t2 VALUES (4, 'db1_4')");
+            table = getFileStoreTable("t2");
+            rowType =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.INT().notNull(),
+                                DataTypes.VARCHAR(10),
+                                DataTypes.STRING().notNull(),
+                                DataTypes.STRING().notNull()
+                            },
+                            new String[] {"k", "v1", "table_name", "database_name"});
+            waitForResult(
+                    Arrays.asList(
+                            "+I[1, db1_1, t2, metadata]",
+                            "+I[2, db1_2, t2, metadata]",
+                            "+I[3, db1_3, t2, metadata]",
+                            "+I[4, db1_4, t2, metadata]"),
+                    table,
+                    rowType,
+                    Collections.singletonList("k"));
+
+            // test newly created table
+            if (mode == COMBINED) {
+                statement.execute("SET search_path TO " + "metadata");
+                statement.executeUpdate("CREATE TABLE t3 (k INT, v1 VARCHAR(10), PRIMARY KEY (k))");
+                statement.executeUpdate("INSERT INTO t3 VALUES (1, 'Hi')");
+                waitingTables("t3");
+                table = getFileStoreTable("t3");
+                waitForResult(
+                        Collections.singletonList("+I[1, Hi, t3, metadata]"),
+                        table,
+                        rowType,
+                        Collections.singletonList("k"));
+            }
+        }
+    }
+
+    private class SyncNewTableJob implements Runnable {
+
+        private final int ith;
+        private final Statement statement;
+        private final List<Tuple2<Integer, String>> records;
+
+        SyncNewTableJob(int ith, Statement statement, List<Tuple2<Integer, String>> records) {
+            this.ith = ith;
+            this.statement = statement;
+            this.records = records;
+        }
+
+        @Override
+        public void run() {
+            String newTableName = "t" + ith;
+            try {
+                createNewTable(statement, newTableName);
+                String sql =
+                        String.format(
+                                "INSERT INTO %s VALUES %s",
+                                newTableName,
+                                records.stream()
+                                        .map(
+                                                tuple ->
+                                                        String.format(
+                                                                "(%d, '%s')", tuple.f0, tuple.f1))
+                                        .collect(Collectors.joining(", ")));
+                statement.executeUpdate(sql);
+            } catch (SQLException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}
diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_database_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_database_setup.sql
new file mode 100644
index 0000000000000..d31e0ae472075
--- /dev/null
+++ b/paimon-flink/paimon-flink-cdc/src/test/resources/postgres/sync_database_setup.sql
@@ -0,0 +1,479 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- In production you would almost certainly limit the replication user must be on the follower (slave) machine,
+-- to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'.
+-- However, in this database we'll grant the test user 'paimonuser' all privileges:
+--
+GRANT ALL PRIVILEGES ON *.* TO 'paimonuser'@'%';
+
+-- ################################################################################
+--  MySqlSyncDatabaseActionITCase
+-- ################################################################################
+
+CREATE DATABASE paimon_sync_database;
+
+
+CREATE SCHEMA paimon_sync_schema;
+SET search_path TO paimon_sync_schema;
+CREATE TABLE t1 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+    k1 INT,
+    k2 VARCHAR(10),
+    v1 INT,
+    v2 BIGINT,
+    PRIMARY KEY (k1, k2)
+);
+
+-- no primary key, should be ignored
+CREATE TABLE t3 (
+    v1 INT
+);
+
+-- to make sure we use JDBC Driver correctly
+CREATE schema paimon_sync_schema1;
+SET search_path TO paimon_sync_schema1;
+CREATE TABLE t1 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+    k1 INT,
+    k2 VARCHAR(10),
+    v1 INT,
+    v2 BIGINT,
+    PRIMARY KEY (k1, k2)
+);
+
+-- no primary key, should be ignored
+CREATE TABLE t3 (
+    v1 INT
+);
+
+-- ################################################################################
+--  MySqlSyncDatabaseActionITCase#testIgnoreIncompatibleTables
+-- ################################################################################
+
+CREATE schema paimon_sync_schema_ignore_incompatible;
+SET search_path TO paimon_sync_schema_ignore_incompatible;
+
+CREATE TABLE incompatible (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE compatible (
+    k1 INT,
+    k2 VARCHAR(10),
+    v1 INT,
+    v2 BIGINT,
+    PRIMARY KEY (k1, k2)
+);
+
+-- ################################################################################
+--  MySqlSyncDatabaseActionITCase#testTableAffix
+-- ################################################################################
+
+CREATE schema paimon_sync_schema_affix;
+SET search_path TO paimon_sync_schema_affix;
+
+CREATE TABLE t1 (
+    k1 INT,
+    v0 VARCHAR(10),
+    PRIMARY KEY (k1)
+);
+
+CREATE TABLE t2 (
+    k2 INT,
+    v0 VARCHAR(10),
+    PRIMARY KEY (k2)
+);
+
+-- ################################################################################
+--  MySqlSyncDatabaseActionITCase#testIncludingTables
+-- ################################################################################
+
+CREATE schema paimon_sync_schema_including;
+SET search_path TO paimon_sync_schema_including;
+
+CREATE TABLE paimon_1 (
+    k INT,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE paimon_2 (
+    k INT,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE flink (
+    k INT,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE ignored (
+    k INT,
+    PRIMARY KEY (k)
+);
+
+-- ################################################################################
+--  MySqlSyncDatabaseActionITCase#testExcludingTables
+-- ################################################################################
+
+CREATE schema paimon_sync_schema_excluding;
+SET search_path TO paimon_sync_schema_excluding;
+CREATE TABLE paimon_1 (
+    k INT,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE paimon_2 (
+    k INT,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE flink (
+    k INT,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE sync (
+    k INT,
+    PRIMARY KEY (k)
+);
+
+-- ################################################################################
+--  MySqlSyncDatabaseActionITCase#testIncludingAndExcludingTables
+-- ################################################################################
+
+CREATE schema paimon_sync_schema_in_excluding;
+SET search_path TO paimon_sync_schema_in_excluding;
+CREATE TABLE paimon_1 (
+    k INT,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE paimon_2 (
+    k INT,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE flink (
+    k INT,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE test (
+    k INT,
+    PRIMARY KEY (k)
+);
+
+-- ################################################################################
+--  MySqlSyncDatabaseActionITCase#testIgnoreCase
+-- ################################################################################
+
+CREATE schema paimon_ignore_CASE;
+SET search_path TO paimon_ignore_CASE;
+
+CREATE TABLE T (
+    k INT,
+    UPPERCASE_V0 VARCHAR(20),
+    PRIMARY KEY (k)
+);
+
+
+-- ################################################################################
+--  MySqlSyncDatabaseActionITCase#testNewlyAddedTables
+-- ################################################################################
+
+CREATE schema paimon_sync_schema_newly_added_tables;
+SET search_path TO paimon_sync_schema_newly_added_tables;
+CREATE TABLE t1 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+    k1 INT,
+    k2 VARCHAR(10),
+    v1 INT,
+    v2 BIGINT,
+    PRIMARY KEY (k1, k2)
+);
+
+CREATE schema paimon_sync_schema_newly_added_tables_1;
+SET search_path TO paimon_sync_schema_newly_added_tables_1;
+CREATE TABLE t1 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+    k1 INT,
+    k2 VARCHAR(10),
+    v1 INT,
+    v2 BIGINT,
+    PRIMARY KEY (k1, k2)
+);
+
+
+CREATE schema paimon_sync_schema_newly_added_tables_2;
+SET search_path TO paimon_sync_schema_newly_added_tables_2;
+CREATE TABLE t1 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+    k1 INT,
+    k2 VARCHAR(10),
+    v1 INT,
+    v2 BIGINT,
+    PRIMARY KEY (k1, k2)
+);
+
+
+CREATE schema paimon_sync_schema_newly_added_tables_3;
+SET search_path TO paimon_sync_database_newly_added_tables_3;
+
+CREATE TABLE t1 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+    k1 INT,
+    k2 VARCHAR(10),
+    v1 INT,
+    v2 BIGINT,
+    PRIMARY KEY (k1, k2)
+);
+
+
+CREATE schema paimon_sync_schema_newly_added_tables_4;
+SET search_path TO paimon_sync_schema_newly_added_tables_4;
+
+CREATE TABLE t1 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+    k1 INT,
+    k2 VARCHAR(10),
+    v1 INT,
+    v2 BIGINT,
+    PRIMARY KEY (k1, k2)
+);
+
+CREATE schema paimon_sync_schema_add_ignored_table;
+SET search_path TO paimon_sync_schema_add_ignored_table;
+
+CREATE TABLE t1 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE a (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE schema many_table_sync_test;
+SET search_path TO many_table_sync_test;
+
+CREATE TABLE a (
+    k INT,
+    v VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+-- ################################################################################
+--  testSyncMultipleShards
+-- ################################################################################
+
+CREATE schema schema_shard_1;
+SET search_path TO schema_shard_1;
+
+CREATE TABLE t1 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+    k BIGINT,
+    v1 DOUBLE,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE t3 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE schema schema_shard_2;
+SET search_path TO schema_shard_2;
+
+-- test schema merging
+CREATE TABLE t1 (
+    k INT,
+    v1 VARCHAR(20),
+    v2 BIGINT,
+    PRIMARY KEY (k)
+);
+
+-- test schema evolution
+CREATE TABLE t2 (
+    k BIGINT,
+    v1 DOUBLE,
+    PRIMARY KEY (k)
+);
+
+-- test some shard doesn't have primary key
+CREATE TABLE t3 (
+    k INT,
+    v1 VARCHAR(10)
+);
+
+-- ################################################################################
+--  testSyncMultipleShardsWithoutMerging
+-- ################################################################################
+
+CREATE schema without_merging_shard_1;
+SET search_path TO without_merging_shard_1;
+
+CREATE TABLE t1 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE schema without_merging_shard_2;
+SET search_path TO without_merging_shard_2;
+
+CREATE TABLE t1 (
+    k INT,
+    v1 VARCHAR(20),
+    v2 BIGINT,
+    PRIMARY KEY (k)
+);
+
+-- test some shard doesn't have primary key
+CREATE TABLE t2 (
+    k INT,
+    v1 VARCHAR(10)
+);
+
+-- ################################################################################
+--  testMonitoredAndExcludedTablesWithMering
+-- ################################################################################
+
+CREATE schema monitored_and_excluded_shard_1;
+SET search_path TO monitored_and_excluded_shard_1;
+
+CREATE TABLE t1 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE t3 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+
+CREATE schema monitored_and_excluded_shard_2;
+SET search_path TO monitored_and_excluded_shard_2;
+
+CREATE TABLE t1 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+    k INT,
+    v2 DOUBLE,
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE t3 (
+    k INT,
+    v2 VARCHAR(10)
+);
+
+-- ################################################################################
+--  MySqlSyncDatabaseActionITCase#testNewlyAddedTablesOptionsChange
+-- ################################################################################
+
+CREATE schema newly_added_tables_option_schange;
+SET search_path TO newly_added_tables_option_schange;
+
+CREATE TABLE t1 (
+   k INT,
+   v1 VARCHAR(10),
+   PRIMARY KEY (k)
+);
+
+-- ################################################################################
+--  testMetadataColumns
+-- ################################################################################
+
+CREATE schema metadata;
+SET search_path TO metadata;
+
+CREATE TABLE t1 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);
+
+CREATE TABLE t2 (
+    k INT,
+    v1 VARCHAR(10),
+    PRIMARY KEY (k)
+);