Skip to content

Commit 95fe4d3

Browse files
[FLINK-36684][cdc-connector/mysql] Support read changelog as append only mode
This close #3708
1 parent 851c37e commit 95fe4d3

File tree

9 files changed

+217
-22
lines changed

9 files changed

+217
-22
lines changed

docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,19 @@ Flink SQL> SELECT * FROM orders;
400400
这是一项实验特性,默认为 false。
401401
</td>
402402
</tr>
403+
<tr>
404+
<td>scan.read-changelog-as-append-only.enabled</td>
405+
<td>optional</td>
406+
<td style="word-wrap: break-word;">false</td>
407+
<td>Boolean</td>
408+
<td>
409+
是否将 changelog 数据流转换为 append-only 数据流。<br>
410+
仅在需要保存上游表删除消息等特殊场景下开启使用,比如在逻辑删除场景下,用户不允许物理删除下游消息,此时使用该特性,并配合 row_kind 元数据字段,下游可以先保存所有明细数据,再通过 row_kind 字段判断是否进行逻辑删除。<br>
411+
参数取值如下:<br>
412+
<li>true:所有类型的消息(包括INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER)都会转换成 INSERT 类型的消息。</li>
413+
<li>false(默认):所有类型的消息都保持原样下发。</li>
414+
</td>
415+
</tr>
403416
</tbody>
404417
</table>
405418
</div>
@@ -433,6 +446,13 @@ Flink SQL> SELECT * FROM orders;
433446
<td>TIMESTAMP_LTZ(3) NOT NULL</td>
434447
<td>当前记录表在数据库中更新的时间。 <br>如果从表的快照而不是 binlog 读取记录,该值将始终为0。</td>
435448
</tr>
449+
<tr>
450+
<td>row_kind</td>
451+
<td>STRING NOT NULL</td>
452+
<td>当前记录的变更类型。<br>
453+
注意:如果 Source 算子选择为每条记录输出 row_kind 列,则下游 SQL 操作符在处理回撤时可能会由于此新添加的列而无法比较,导致出现非确定性更新问题。建议仅在简单的同步作业中使用此元数据列。<br>
454+
'+I' 表示 INSERT 消息,'-D' 表示 DELETE 消息,'-U' 表示 UPDATE_BEFORE 消息,'+U' 表示 UPDATE_AFTER 消息。</td>
455+
</tr>
436456
</tbody>
437457
</table>
438458

@@ -442,6 +462,7 @@ CREATE TABLE products (
442462
db_name STRING METADATA FROM 'database_name' VIRTUAL,
443463
table_name STRING METADATA FROM 'table_name' VIRTUAL,
444464
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
465+
operation STRING METADATA FROM 'row_kind' VIRTUAL,
445466
order_id INT,
446467
order_date TIMESTAMP(0),
447468
customer_name STRING,
@@ -466,6 +487,7 @@ CREATE TABLE products (
466487
db_name STRING METADATA FROM 'database_name' VIRTUAL,
467488
table_name STRING METADATA FROM 'table_name' VIRTUAL,
468489
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
490+
operation STRING METADATA FROM 'row_kind' VIRTUAL,
469491
order_id INT,
470492
order_date TIMESTAMP(0),
471493
customer_name STRING,

docs/content/docs/connectors/flink-sources/mysql-cdc.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,19 @@ During a snapshot operation, the connector will query each included table to pro
426426
Experimental option, defaults to false.
427427
</td>
428428
</tr>
429+
<tr>
430+
<td>scan.read-changelog-as-append-only.enabled</td>
431+
<td>optional</td>
432+
<td style="word-wrap: break-word;">false</td>
433+
<td>Boolean</td>
434+
<td>
435+
Whether to convert the changelog stream to an append-only stream.<br>
436+
This feature is only used in special scenarios where you need to save upstream table deletion messages. For example, in a logical deletion scenario, users are not allowed to physically delete downstream messages. In this case, this feature is used in conjunction with the row_kind metadata field. Therefore, the downstream can save all detailed data at first, and then use the row_kind field to determine whether to perform logical deletion.<br>
437+
The option values are as follows:<br>
438+
<li>true: All types of messages (including INSERT, DELETE, UPDATE_BEFORE, and UPDATE_AFTER) will be converted into INSERT messages.</li>
439+
<li>false (default): All types of messages are sent as is.</li>
440+
</td>
441+
</tr>
429442
</tbody>
430443
</table>
431444
</div>

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/AppendMetadataCollector.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.flink.table.data.GenericRowData;
2222
import org.apache.flink.table.data.RowData;
2323
import org.apache.flink.table.data.utils.JoinedRowData;
24+
import org.apache.flink.types.RowKind;
2425
import org.apache.flink.util.Collector;
2526

2627
import org.apache.kafka.connect.source.SourceRecord;
@@ -37,8 +38,15 @@ public final class AppendMetadataCollector implements Collector<RowData>, Serial
3738
public transient SourceRecord inputRecord;
3839
public transient Collector<RowData> outputCollector;
3940

41+
private final boolean appendOnly;
42+
4043
public AppendMetadataCollector(MetadataConverter[] metadataConverters) {
44+
this(metadataConverters, false);
45+
}
46+
47+
public AppendMetadataCollector(MetadataConverter[] metadataConverters, boolean appendOnly) {
4148
this.metadataConverters = metadataConverters;
49+
this.appendOnly = appendOnly;
4250
}
4351

4452
@Override
@@ -55,7 +63,12 @@ public void collect(RowData physicalRow) {
5563

5664
metaRow.setField(i, meta);
5765
}
58-
RowData outRow = new JoinedRowData(physicalRow.getRowKind(), physicalRow, metaRow);
66+
RowData outRow;
67+
if (appendOnly) {
68+
outRow = new JoinedRowData(RowKind.INSERT, physicalRow, metaRow);
69+
} else {
70+
outRow = new JoinedRowData(physicalRow.getRowKind(), physicalRow, metaRow);
71+
}
5972
outputCollector.collect(outRow);
6073
}
6174

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/table/RowDataDebeziumDeserializeSchema.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,10 @@ public static Builder newBuilder() {
104104
ValueValidator validator,
105105
ZoneId serverTimeZone,
106106
DeserializationRuntimeConverterFactory userDefinedConverterFactory,
107-
DebeziumChangelogMode changelogMode) {
107+
DebeziumChangelogMode changelogMode,
108+
boolean appendOnly) {
108109
this.hasMetadata = checkNotNull(metadataConverters).length > 0;
109-
this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters);
110+
this.appendMetadataCollector = new AppendMetadataCollector(metadataConverters, appendOnly);
110111
this.physicalConverter =
111112
createConverter(
112113
checkNotNull(physicalDataType),
@@ -190,6 +191,8 @@ public static class Builder {
190191
DeserializationRuntimeConverterFactory.DEFAULT;
191192
private DebeziumChangelogMode changelogMode = DebeziumChangelogMode.ALL;
192193

194+
private boolean appendOnly = false;
195+
193196
public Builder setPhysicalRowType(RowType physicalRowType) {
194197
this.physicalRowType = physicalRowType;
195198
return this;
@@ -226,6 +229,11 @@ public Builder setChangelogMode(DebeziumChangelogMode changelogMode) {
226229
return this;
227230
}
228231

232+
public Builder setAppendOnly(boolean appendOnly) {
233+
this.appendOnly = appendOnly;
234+
return this;
235+
}
236+
229237
public RowDataDebeziumDeserializeSchema build() {
230238
return new RowDataDebeziumDeserializeSchema(
231239
physicalRowType,
@@ -234,7 +242,8 @@ public RowDataDebeziumDeserializeSchema build() {
234242
validator,
235243
serverTimeZone,
236244
userDefinedConverterFactory,
237-
changelogMode);
245+
changelogMode,
246+
appendOnly);
238247
}
239248
}
240249

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,13 @@ public class MySqlSourceOptions {
191191
.withDescription(
192192
"Optional interval of sending heartbeat event for tracing the latest available binlog offsets");
193193

194+
public static final ConfigOption<Boolean> SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED =
195+
ConfigOptions.key("scan.read-changelog-as-append-only.enabled")
196+
.booleanType()
197+
.defaultValue(false)
198+
.withDescription(
199+
"Whether to convert the changelog data stream to an append-only data stream");
200+
194201
// ----------------------------------------------------------------------------
195202
// experimental options, won't add them to documentation
196203
// ----------------------------------------------------------------------------

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
102102
private final boolean useLegacyJsonFormat;
103103
private final boolean assignUnboundedChunkFirst;
104104

105+
private final boolean appendOnly;
106+
105107
// --------------------------------------------------------------------------------------------
106108
// Mutable attributes
107109
// --------------------------------------------------------------------------------------------
@@ -141,7 +143,8 @@ public MySqlTableSource(
141143
boolean skipSnapshotBackFill,
142144
boolean parseOnlineSchemaChanges,
143145
boolean useLegacyJsonFormat,
144-
boolean assignUnboundedChunkFirst) {
146+
boolean assignUnboundedChunkFirst,
147+
boolean appendOnly) {
145148
this.physicalSchema = physicalSchema;
146149
this.port = port;
147150
this.hostname = checkNotNull(hostname);
@@ -174,11 +177,16 @@ public MySqlTableSource(
174177
this.skipSnapshotBackFill = skipSnapshotBackFill;
175178
this.useLegacyJsonFormat = useLegacyJsonFormat;
176179
this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
180+
this.appendOnly = appendOnly;
177181
}
178182

179183
@Override
180184
public ChangelogMode getChangelogMode() {
181-
return ChangelogMode.all();
185+
if (appendOnly) {
186+
return ChangelogMode.insertOnly();
187+
} else {
188+
return ChangelogMode.all();
189+
}
182190
}
183191

184192
@Override
@@ -197,6 +205,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
197205
.setServerTimeZone(serverTimeZone)
198206
.setUserDefinedConverterFactory(
199207
MySqlDeserializationConverterFactory.instance())
208+
.setAppendOnly(appendOnly)
200209
.build();
201210
if (enableParallelRead) {
202211
MySqlSource<RowData> parallelSource =
@@ -320,7 +329,8 @@ public DynamicTableSource copy() {
320329
skipSnapshotBackFill,
321330
parseOnlineSchemaChanges,
322331
useLegacyJsonFormat,
323-
assignUnboundedChunkFirst);
332+
assignUnboundedChunkFirst,
333+
appendOnly);
324334
source.metadataKeys = metadataKeys;
325335
source.producedDataType = producedDataType;
326336
return source;
@@ -365,7 +375,8 @@ public boolean equals(Object o) {
365375
&& Objects.equals(skipSnapshotBackFill, that.skipSnapshotBackFill)
366376
&& parseOnlineSchemaChanges == that.parseOnlineSchemaChanges
367377
&& useLegacyJsonFormat == that.useLegacyJsonFormat
368-
&& assignUnboundedChunkFirst == that.assignUnboundedChunkFirst;
378+
&& assignUnboundedChunkFirst == that.assignUnboundedChunkFirst
379+
&& Objects.equals(appendOnly, that.appendOnly);
369380
}
370381

371382
@Override
@@ -401,7 +412,8 @@ public int hashCode() {
401412
skipSnapshotBackFill,
402413
parseOnlineSchemaChanges,
403414
useLegacyJsonFormat,
404-
assignUnboundedChunkFirst);
415+
assignUnboundedChunkFirst,
416+
appendOnly);
405417
}
406418

407419
@Override

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,9 @@ public DynamicTableSource createDynamicTableSource(Context context) {
108108
boolean assignUnboundedChunkFirst =
109109
config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST);
110110

111+
boolean appendOnly =
112+
config.get(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
113+
111114
if (enableParallelRead) {
112115
validatePrimaryKeyIfEnableParallel(physicalSchema, chunkKeyColumn);
113116
validateIntegerOption(
@@ -153,7 +156,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
153156
skipSnapshotBackFill,
154157
parseOnLineSchemaChanges,
155158
useLegacyJsonFormat,
156-
assignUnboundedChunkFirst);
159+
assignUnboundedChunkFirst,
160+
appendOnly);
157161
}
158162

159163
@Override
@@ -202,6 +206,7 @@ public Set<ConfigOption<?>> optionalOptions() {
202206
options.add(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES);
203207
options.add(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT);
204208
options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST);
209+
options.add(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
205210
return options;
206211
}
207212

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2334,4 +2334,106 @@ void testBinaryHandlingModeWithBase64() throws Exception {
23342334
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
23352335
result.getJobClient().get().cancel().get();
23362336
}
2337+
2338+
@ParameterizedTest(name = "incrementalSnapshot = {0}")
2339+
@ValueSource(booleans = {true, false})
2340+
public void testReadChangelogAppendOnly(boolean incrementalSnapshot) throws Exception {
2341+
setup(incrementalSnapshot);
2342+
userDatabase1.createAndInitialize();
2343+
String sourceDDL =
2344+
String.format(
2345+
"CREATE TABLE mysql_users ("
2346+
+ " db_name STRING METADATA FROM 'database_name' VIRTUAL,"
2347+
+ " table_name STRING METADATA VIRTUAL,"
2348+
+ " row_kind STRING METADATA FROM 'row_kind' VIRTUAL,"
2349+
+ " `id` DECIMAL(20, 0) NOT NULL,"
2350+
+ " name STRING,"
2351+
+ " address STRING,"
2352+
+ " phone_number STRING,"
2353+
+ " email STRING,"
2354+
+ " age INT,"
2355+
+ " primary key (`id`) not enforced"
2356+
+ ") WITH ("
2357+
+ " 'connector' = 'mysql-cdc',"
2358+
+ " 'hostname' = '%s',"
2359+
+ " 'port' = '%s',"
2360+
+ " 'username' = '%s',"
2361+
+ " 'password' = '%s',"
2362+
+ " 'database-name' = '%s',"
2363+
+ " 'table-name' = '%s',"
2364+
+ " 'scan.incremental.snapshot.enabled' = '%s',"
2365+
+ " 'server-id' = '%s',"
2366+
+ " 'server-time-zone' = 'UTC',"
2367+
+ " 'scan.incremental.snapshot.chunk.size' = '%s',"
2368+
+ " 'scan.read-changelog-as-append-only.enabled' = 'true'"
2369+
+ ")",
2370+
MYSQL_CONTAINER.getHost(),
2371+
MYSQL_CONTAINER.getDatabasePort(),
2372+
userDatabase1.getUsername(),
2373+
userDatabase1.getPassword(),
2374+
userDatabase1.getDatabaseName(),
2375+
"user_table_.*",
2376+
incrementalSnapshot,
2377+
getServerId(incrementalSnapshot),
2378+
getSplitSize(incrementalSnapshot));
2379+
2380+
String sinkDDL =
2381+
"CREATE TABLE sink ("
2382+
+ " database_name STRING,"
2383+
+ " table_name STRING,"
2384+
+ " row_kind STRING,"
2385+
+ " `id` DECIMAL(20, 0) NOT NULL,"
2386+
+ " name STRING,"
2387+
+ " address STRING,"
2388+
+ " phone_number STRING,"
2389+
+ " email STRING,"
2390+
+ " age INT,"
2391+
+ " primary key (database_name, table_name, id) not enforced"
2392+
+ ") WITH ("
2393+
+ " 'connector' = 'values',"
2394+
+ " 'sink-insert-only' = 'false'"
2395+
+ ")";
2396+
tEnv.executeSql(sourceDDL);
2397+
tEnv.executeSql(sinkDDL);
2398+
2399+
// async submit job
2400+
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM mysql_users");
2401+
2402+
// wait for snapshot finished and begin binlog
2403+
waitForSinkSize("sink", 2);
2404+
2405+
try (Connection connection = userDatabase1.getJdbcConnection();
2406+
Statement statement = connection.createStatement()) {
2407+
2408+
statement.execute(
2409+
"INSERT INTO user_table_1_2 VALUES (200,'user_200','Wuhan',123567891234);");
2410+
statement.execute(
2411+
"INSERT INTO user_table_1_1 VALUES (300,'user_300','Hangzhou',123567891234, 'user_300@foo.com');");
2412+
statement.execute("UPDATE user_table_1_1 SET address='Beijing' WHERE id=300;");
2413+
statement.execute("UPDATE user_table_1_2 SET phone_number=88888888 WHERE id=121;");
2414+
statement.execute("DELETE FROM user_table_1_1 WHERE id=111;");
2415+
}
2416+
2417+
// waiting for binlog finished (5 more events)
2418+
waitForSinkSize("sink", 7);
2419+
2420+
List<String> expected =
2421+
Stream.of(
2422+
"+I[%s, user_table_1_1, +I, 111, user_111, Shanghai, 123567891234, user_111@foo.com, null]",
2423+
"+I[%s, user_table_1_2, +I, 121, user_121, Shanghai, 123567891234, null, null]",
2424+
"+I[%s, user_table_1_2, +I, 200, user_200, Wuhan, 123567891234, null, null]",
2425+
"+I[%s, user_table_1_1, +I, 300, user_300, Hangzhou, 123567891234, user_300@foo.com, null]",
2426+
"+I[%s, user_table_1_1, +U, 300, user_300, Beijing, 123567891234, user_300@foo.com, null]",
2427+
"+I[%s, user_table_1_2, +U, 121, user_121, Shanghai, 88888888, null, null]",
2428+
"+I[%s, user_table_1_1, -D, 111, user_111, Shanghai, 123567891234, user_111@foo.com, null]",
2429+
"+I[%s, user_table_1_1, -U, 300, user_300, Hangzhou, 123567891234, user_300@foo.com, null]",
2430+
"+I[%s, user_table_1_2, -U, 121, user_121, Shanghai, 123567891234, null, null]")
2431+
.map(s -> String.format(s, userDatabase1.getDatabaseName()))
2432+
.sorted()
2433+
.collect(Collectors.toList());
2434+
2435+
List<String> actual = TestValuesTableFactory.getRawResultsAsStrings("sink");
2436+
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
2437+
result.getJobClient().get().cancel().get();
2438+
}
23372439
}

0 commit comments

Comments
 (0)