Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Apr 17, 2024
1 parent ad40e4a commit 02e663d
Show file tree
Hide file tree
Showing 5 changed files with 1,899 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ public PostgresSyncDatabaseAction(
this.mode = DIVIDED;
}

public List<Pair<Identifier, String>> monitoredTables() {
return monitoredTables;
}

public List<Pair<Identifier, String>> excludedTables() {
return excludedTables;
}

public PostgresSyncDatabaseAction ignoreIncompatible(boolean ignoreIncompatible) {
this.ignoreIncompatible = ignoreIncompatible;
return this;
Expand Down Expand Up @@ -179,7 +187,7 @@ protected Object buildSource() {
return PostgresActionUtils.buildPostgresSource(
cdcSourceConfig,
// todo
new String[] {},
new String[] {cdcSourceConfig.get(PostgresSourceOptions.SCHEMA_NAME)},
new String[] {
PostgresActionUtils.tableList(
mode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void printHelp() {
"--merge-shards is default true, in this case, if some tables in different databases have the same name, "
+ "their schemas will be merged and their records will be synchronized into one Paimon table. "
+ "Otherwise, each table's records will be synchronized to a corresponding Paimon table, "
+ "and the Paimon table will be named to 'databaseName_tableName' to avoid potential name conflict.");
+ "and the Paimon table will be named to 'databaseName_schemaName_tableName' to avoid potential name conflict.");
System.out.println();

System.out.println(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.action.cdc.postgres;

import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase;
import org.apache.paimon.flink.action.cdc.mysql.MySqlSyncDatabaseAction;

import com.ververica.cdc.connectors.postgres.source.PostgresConnectionPoolFactory;
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions;
Expand Down Expand Up @@ -119,6 +120,11 @@ protected String getSlotName() {
return "paimon_" + id;
}

protected PostgresSyncDatabaseActionBuilder syncDatabaseActionBuilder(
Map<String, String> postgresConfig) {
return new PostgresSyncDatabaseActionBuilder(postgresConfig);
}

protected PostgresSyncTableActionBuilder syncTableActionBuilder(
Map<String, String> postgresConfig) {
return new PostgresSyncTableActionBuilder(postgresConfig);
Expand All @@ -132,4 +138,13 @@ public PostgresSyncTableActionBuilder(Map<String, String> postgresConfig) {
super(PostgresSyncTableAction.class, postgresConfig);
}
}

/** Builder to build {@link MySqlSyncDatabaseAction} from action arguments. */
protected class PostgresSyncDatabaseActionBuilder
extends SyncDatabaseActionBuilder<PostgresSyncDatabaseAction> {

public PostgresSyncDatabaseActionBuilder(Map<String, String> postgresConfig) {
super(PostgresSyncDatabaseAction.class, postgresConfig);
}
}
}
Loading

0 comments on commit 02e663d

Please sign in to comment.