Skip to content

Commit 8d93c51

Browse files
committed
fixed
1 parent e27ceb4 commit 8d93c51

File tree

11 files changed

+2416
-7
lines changed

11 files changed

+2416
-7
lines changed

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ protected void beforeBuildingSourceSink() throws Exception {
127127
typeMapping);
128128

129129
logNonPkTables(mySqlSchemasInfo.nonPkTables());
130-
List<JdbcTableInfo> jdbcTableInfos = mySqlSchemasInfo.toMySqlTableInfos(mergeShards);
130+
List<JdbcTableInfo> jdbcTableInfos = mySqlSchemasInfo.toTableInfos(mergeShards);
131131

132132
checkArgument(
133133
!jdbcTableInfos.isEmpty(),

paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresActionUtils.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919
package org.apache.paimon.flink.action.cdc.postgres;
2020

2121
import org.apache.paimon.catalog.Identifier;
22+
import org.apache.paimon.flink.action.MultiTablesSinkMode;
2223
import org.apache.paimon.flink.action.cdc.TypeMapping;
2324
import org.apache.paimon.flink.action.cdc.schema.JdbcSchemaUtils;
2425
import org.apache.paimon.flink.action.cdc.schema.JdbcSchemasInfo;
2526
import org.apache.paimon.options.OptionsUtils;
2627
import org.apache.paimon.schema.Schema;
28+
import org.apache.paimon.utils.Pair;
2729

2830
import com.ververica.cdc.connectors.base.options.StartupOptions;
2931
import com.ververica.cdc.connectors.base.source.jdbc.JdbcIncrementalSource;
@@ -46,7 +48,10 @@
4648
import java.util.function.Predicate;
4749
import java.util.regex.Matcher;
4850
import java.util.regex.Pattern;
51+
import java.util.stream.Collectors;
4952

53+
import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
54+
import static org.apache.paimon.flink.action.MultiTablesSinkMode.DIVIDED;
5055
import static org.apache.paimon.flink.action.cdc.postgres.PostgresTypeUtils.toPaimonTypeVisitor;
5156

5257
/** Utils for Postgres Action. */
@@ -69,7 +74,7 @@ static Connection getConnection(Configuration postgresConfig) throws Exception {
6974
public static JdbcSchemasInfo getPostgresTableInfos(
7075
Configuration postgresConfig,
7176
Predicate<String> monitorTablePredication,
72-
List<Identifier> excludedTables,
77+
List<Pair<Identifier, String>> excludedTables,
7378
TypeMapping typeMapping)
7479
throws Exception {
7580

@@ -105,7 +110,7 @@ public static JdbcSchemasInfo getPostgresTableInfos(
105110
toPaimonTypeVisitor());
106111
jdbcSchemasInfo.addSchema(identifier, schemaName, schema);
107112
} else {
108-
excludedTables.add(identifier);
113+
excludedTables.add(Pair.of(identifier, schemaName));
109114
}
110115
}
111116
}
@@ -175,6 +180,49 @@ public static JdbcIncrementalSource<String> buildPostgresSource(
175180
return sourceBuilder.deserializer(schema).includeSchemaChanges(true).build();
176181
}
177182

183+
public static String tableList(
184+
MultiTablesSinkMode mode,
185+
String schemaPattern,
186+
String includingTablePattern,
187+
List<Pair<Identifier, String>> monitoredTables,
188+
List<Pair<Identifier, String>> excludedTables) {
189+
if (mode == DIVIDED) {
190+
return dividedModeTableList(monitoredTables);
191+
} else if (mode == COMBINED) {
192+
return combinedModeTableList(schemaPattern, includingTablePattern, excludedTables);
193+
}
194+
throw new UnsupportedOperationException("Unknown MultiTablesSinkMode: " + mode);
195+
}
196+
197+
private static String dividedModeTableList(List<Pair<Identifier, String>> monitoredTables) {
198+
// In DIVIDED mode, we only concern about existed tables
199+
return monitoredTables.stream()
200+
.map(t -> t.getRight() + "\\." + t.getLeft().getObjectName())
201+
.collect(Collectors.joining("|"));
202+
}
203+
204+
public static String combinedModeTableList(
205+
String schemaPattern,
206+
String includingTablePattern,
207+
List<Pair<Identifier, String>> excludedTables) {
208+
String includingPattern =
209+
String.format("(%s)\\.(%s)", schemaPattern, includingTablePattern);
210+
if (excludedTables.isEmpty()) {
211+
return includingPattern;
212+
}
213+
214+
String excludingPattern =
215+
excludedTables.stream()
216+
.map(
217+
t ->
218+
String.format(
219+
"(^%s$)",
220+
t.getRight() + "\\." + t.getLeft().getObjectName()))
221+
.collect(Collectors.joining("|"));
222+
excludingPattern = "?!" + excludingPattern;
223+
return String.format("(%s)(%s)", excludingPattern, includingPattern);
224+
}
225+
178226
public static void registerJdbcDriver() {
179227
try {
180228
Class.forName("org.postgresql.Driver");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.flink.action.cdc.postgres;
20+
21+
import org.apache.paimon.catalog.Catalog;
22+
import org.apache.paimon.catalog.Identifier;
23+
import org.apache.paimon.flink.action.Action;
24+
import org.apache.paimon.flink.action.cdc.CdcActionCommonUtils;
25+
import org.apache.paimon.flink.action.cdc.SyncDatabaseActionBase;
26+
import org.apache.paimon.flink.action.cdc.SyncJobHandler;
27+
import org.apache.paimon.flink.action.cdc.TableNameConverter;
28+
import org.apache.paimon.flink.action.cdc.schema.JdbcSchemasInfo;
29+
import org.apache.paimon.flink.action.cdc.schema.JdbcTableInfo;
30+
import org.apache.paimon.schema.Schema;
31+
import org.apache.paimon.schema.TableSchema;
32+
import org.apache.paimon.table.FileStoreTable;
33+
import org.apache.paimon.utils.Pair;
34+
35+
import com.ververica.cdc.connectors.postgres.source.config.PostgresSourceOptions;
36+
import org.slf4j.Logger;
37+
import org.slf4j.LoggerFactory;
38+
39+
import javax.annotation.Nullable;
40+
41+
import java.util.ArrayList;
42+
import java.util.Collections;
43+
import java.util.HashMap;
44+
import java.util.HashSet;
45+
import java.util.List;
46+
import java.util.Map;
47+
import java.util.Set;
48+
import java.util.function.Supplier;
49+
import java.util.regex.Pattern;
50+
import java.util.stream.Collectors;
51+
52+
import static org.apache.paimon.flink.action.MultiTablesSinkMode.DIVIDED;
53+
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.schemaCompatible;
54+
55+
/** An {@link Action} which synchronize the whole Postgresql database into one Paimon database. */
56+
public class PostgresSyncDatabaseAction extends SyncDatabaseActionBase {
57+
58+
private static final Logger LOG = LoggerFactory.getLogger(PostgresSyncDatabaseAction.class);
59+
private boolean ignoreIncompatible = false;
60+
private final List<Pair<Identifier, String>> monitoredTables = new ArrayList<>();
61+
private final List<Pair<Identifier, String>> excludedTables = new ArrayList<>();
62+
63+
public PostgresSyncDatabaseAction(
64+
String warehouse,
65+
String database,
66+
Map<String, String> catalogConfig,
67+
Map<String, String> postgresConfig) {
68+
super(
69+
warehouse,
70+
database,
71+
catalogConfig,
72+
postgresConfig,
73+
SyncJobHandler.SourceType.POSTGRES);
74+
this.mode = DIVIDED;
75+
}
76+
77+
public List<Pair<Identifier, String>> monitoredTables() {
78+
return monitoredTables;
79+
}
80+
81+
public List<Pair<Identifier, String>> excludedTables() {
82+
return excludedTables;
83+
}
84+
85+
public PostgresSyncDatabaseAction ignoreIncompatible(boolean ignoreIncompatible) {
86+
this.ignoreIncompatible = ignoreIncompatible;
87+
return this;
88+
}
89+
90+
@Override
91+
protected void beforeBuildingSourceSink() throws Exception {
92+
Pattern includingPattern = Pattern.compile(includingTables);
93+
Pattern excludingPattern =
94+
excludingTables == null ? null : Pattern.compile(excludingTables);
95+
JdbcSchemasInfo jdbcSchemasInfo =
96+
PostgresActionUtils.getPostgresTableInfos(
97+
cdcSourceConfig,
98+
tableName ->
99+
shouldMonitorTable(tableName, includingPattern, excludingPattern),
100+
excludedTables,
101+
typeMapping);
102+
103+
logNonPkTables(jdbcSchemasInfo);
104+
105+
Map<String, Set<String>> schemaMappings = getSchemaMapping(jdbcSchemasInfo);
106+
107+
List<JdbcTableInfo> postgresTableInfos = jdbcSchemasInfo.toTableInfos(mergeShards);
108+
109+
TableNameConverter tableNameConverter =
110+
new TableNameConverter(caseSensitive, mergeShards, tablePrefix, tableSuffix);
111+
for (JdbcTableInfo tableInfo : postgresTableInfos) {
112+
Identifier identifier =
113+
Identifier.create(
114+
database, tableNameConverter.convert(tableInfo.toPaimonTableName()));
115+
FileStoreTable table;
116+
Schema fromPostgres =
117+
CdcActionCommonUtils.buildPaimonSchema(
118+
identifier.getFullName(),
119+
Collections.emptyList(),
120+
Collections.emptyList(),
121+
Collections.emptyList(),
122+
tableConfig,
123+
tableInfo.schema(),
124+
metadataConverters,
125+
caseSensitive,
126+
true);
127+
try {
128+
table = (FileStoreTable) catalog.getTable(identifier);
129+
table = alterTableOptions(identifier, table);
130+
Supplier<String> errMsg =
131+
incompatibleMessage(table.schema(), tableInfo, identifier);
132+
133+
if (shouldMonitorTable(table.schema(), fromPostgres, errMsg)) {
134+
tables.add(table);
135+
setTables(schemaMappings, tableInfo.identifiers(), monitoredTables);
136+
} else {
137+
setTables(schemaMappings, tableInfo.identifiers(), excludedTables);
138+
}
139+
} catch (Catalog.TableNotExistException e) {
140+
catalog.createTable(identifier, fromPostgres, false);
141+
table = (FileStoreTable) catalog.getTable(identifier);
142+
tables.add(table);
143+
setTables(schemaMappings, tableInfo.identifiers(), monitoredTables);
144+
}
145+
}
146+
}
147+
148+
private void setTables(
149+
Map<String, Set<String>> schemaMappings,
150+
List<Identifier> identifiers,
151+
List<Pair<Identifier, String>> tables) {
152+
identifiers.stream()
153+
.forEach(
154+
item -> {
155+
Set<String> schemas = schemaMappings.get(item.getFullName());
156+
schemas.stream()
157+
.forEach(
158+
schemaName -> {
159+
tables.add(Pair.of(item, schemaName));
160+
});
161+
});
162+
}
163+
164+
private static Map<String, Set<String>> getSchemaMapping(JdbcSchemasInfo jdbcSchemasInfo) {
165+
Map<String, Set<String>> schemaMapping = new HashMap<>();
166+
List<JdbcSchemasInfo.JdbcSchemaInfo> jdbcSchemaInfos = jdbcSchemasInfo.schemaInfos();
167+
for (JdbcSchemasInfo.JdbcSchemaInfo jdbcSchemaInfo : jdbcSchemaInfos) {
168+
if (!jdbcSchemaInfo.isPkTable()) {
169+
continue;
170+
}
171+
String fullName = jdbcSchemaInfo.identifier().getFullName();
172+
if (!schemaMapping.containsKey(fullName)) {
173+
Set<String> schemaNames = new HashSet<>();
174+
schemaNames.add(jdbcSchemaInfo.schemaName());
175+
schemaMapping.put(fullName, schemaNames);
176+
} else {
177+
Set<String> existsSchemas = schemaMapping.get(fullName);
178+
existsSchemas.add(jdbcSchemaInfo.schemaName());
179+
}
180+
}
181+
return schemaMapping;
182+
}
183+
184+
@Override
185+
protected Object buildSource() {
186+
try {
187+
return PostgresActionUtils.buildPostgresSource(
188+
cdcSourceConfig,
189+
// todo
190+
new String[] {cdcSourceConfig.get(PostgresSourceOptions.SCHEMA_NAME)},
191+
new String[] {
192+
PostgresActionUtils.tableList(
193+
mode,
194+
cdcSourceConfig.get(PostgresSourceOptions.SCHEMA_NAME),
195+
includingTables,
196+
monitoredTables,
197+
excludedTables)
198+
});
199+
} catch (Exception e) {
200+
throw new RuntimeException(e);
201+
}
202+
}
203+
204+
private Supplier<String> incompatibleMessage(
205+
TableSchema paimonSchema, JdbcTableInfo tableInfo, Identifier identifier) {
206+
return () ->
207+
String.format(
208+
"Incompatible schema found.\n"
209+
+ "Paimon table is: %s, fields are: %s.\n"
210+
+ "Postgres table is: %s, fields are: %s.\n",
211+
identifier.getFullName(),
212+
paimonSchema.fields(),
213+
tableInfo.location(),
214+
tableInfo.schema().fields());
215+
}
216+
217+
private void logNonPkTables(JdbcSchemasInfo jdbcSchemasInfo) {
218+
List<Identifier> nonPkTables = jdbcSchemasInfo.nonPkTables();
219+
if (!nonPkTables.isEmpty()) {
220+
LOG.debug(
221+
"Didn't find primary keys for tables '{}'. "
222+
+ "These tables won't be synchronized.",
223+
nonPkTables.stream()
224+
.map(Identifier::getFullName)
225+
.collect(Collectors.joining(",")));
226+
jdbcSchemasInfo.schemaInfos().stream()
227+
.forEach(
228+
jdbcSchemaInfo -> {
229+
if (!jdbcSchemaInfo.isPkTable()) {
230+
excludedTables.add(
231+
Pair.of(
232+
jdbcSchemaInfo.identifier(),
233+
jdbcSchemaInfo.schemaName()));
234+
}
235+
});
236+
}
237+
}
238+
239+
private boolean shouldMonitorTable(
240+
String tableName, Pattern includingPattern, @Nullable Pattern excludingPattern) {
241+
boolean shouldMonitor = includingPattern.matcher(tableName).matches();
242+
if (excludingPattern != null) {
243+
shouldMonitor = shouldMonitor && !excludingPattern.matcher(tableName).matches();
244+
}
245+
if (!shouldMonitor) {
246+
LOG.debug("Source table '{}' is excluded.", tableName);
247+
}
248+
return shouldMonitor;
249+
}
250+
251+
private boolean shouldMonitorTable(
252+
TableSchema tableSchema, Schema schema, Supplier<String> errMsg) {
253+
if (schemaCompatible(tableSchema, schema.fields())) {
254+
return true;
255+
} else if (ignoreIncompatible) {
256+
LOG.warn(errMsg.get() + "This table will be ignored.");
257+
return false;
258+
} else {
259+
throw new IllegalArgumentException(
260+
errMsg.get()
261+
+ "If you want to ignore the incompatible tables, please specify --ignore-incompatible to true.");
262+
}
263+
}
264+
}

0 commit comments

Comments
 (0)