Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37120][pipeline-connector/mysql] Add ending split chunk first to avoid TaskManager oom #3856

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,17 @@ Flink SQL> SELECT * FROM orders;
这是一项实验性功能。
</td>
</tr>
<tr>
<td>scan.incremental.snapshot.assign-ending-first.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
快照读取阶段是否先分配 EndingChunk。<br>
这有助于降低 TaskManager 在快照阶段同步最后一个chunk时遇到内存溢出 (OOM) 的风险。<br>
这是一项实验特性,默认为 false。
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
11 changes: 11 additions & 0 deletions docs/content.zh/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,17 @@ pipeline:
<td>Boolean</td>
<td>是否将TINYINT(1)类型当做Boolean类型处理,默认true。</td>
</tr>
<tr>
<td>scan.incremental.snapshot.assign-ending-first.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
快照读取阶段是否先分配 EndingChunk。<br>
这有助于降低 TaskManager 在快照阶段同步最后一个chunk时遇到内存溢出 (OOM) 的风险。<br>
这是一项实验特性,默认为 false。
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
11 changes: 11 additions & 0 deletions docs/content/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,17 @@ During a snapshot operation, the connector will query each included table to pro
When 'use.legacy.json.format' = 'false', the data would be converted to {"key1": "value1", "key2": "value2"}, with whitespace before values and after commas preserved.
</td>
</tr>
<tr>
<td>scan.incremental.snapshot.assign-ending-first.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to assign the ending chunk first during snapshot reading phase.<br>
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br>
Experimental option, defaults to false.
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
11 changes: 11 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,17 @@ pipeline:
When 'use.legacy.json.format' = 'false', the data would be converted to {"key1": "value1", "key2": "value2"}, with whitespace before values and after commas preserved.
</td>
</tr>
<tr>
<td>scan.incremental.snapshot.assign-ending-first.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to assign the ending chunk first during snapshot reading phase.<br>
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br>
Experimental option, defaults to false.
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
Expand Down Expand Up @@ -150,6 +151,8 @@ public DataSource createDataSource(Context context) {
config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
boolean isParsingOnLineSchemaChanges = config.get(PARSE_ONLINE_SCHEMA_CHANGES);
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
boolean isAssignEndingChunkFirst =
config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST);

validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
Expand Down Expand Up @@ -201,7 +204,8 @@ public DataSource createDataSource(Context context) {
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.parseOnLineSchemaChanges(isParsingOnLineSchemaChanges)
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
.useLegacyJsonFormat(useLegacyJsonFormat);
.useLegacyJsonFormat(useLegacyJsonFormat)
.assignEndingChunkFirst(isAssignEndingChunkFirst);

List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);

Expand Down Expand Up @@ -336,6 +340,8 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(INCLUDE_COMMENTS_ENABLED);
options.add(USE_LEGACY_JSON_FORMAT);
options.add(TREAT_TINYINT1_AS_BOOLEAN_ENABLED);
options.add(PARSE_ONLINE_SCHEMA_CHANGES);
options.add(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST);
beryllw marked this conversation as resolved.
Show resolved Hide resolved
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,4 +313,12 @@ public class MySqlDataSourceOptions {
.defaultValue(true)
.withDescription(
"Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format.");

@Experimental
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST =
ConfigOptions.key("scan.incremental.snapshot.assign-ending-first.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to assign the ending chunk first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@
import java.util.stream.Collectors;

import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
Expand Down Expand Up @@ -258,11 +260,18 @@ public void testOptionalOption() {

// optional option
options.put(TREAT_TINYINT1_AS_BOOLEAN_ENABLED.key(), "false");
options.put(PARSE_ONLINE_SCHEMA_CHANGES.key(), "false");
options.put(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST.key(), "false");

Factory.Context context = new MockContext(Configuration.fromMap(options));
MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
assertThat(factory.optionalOptions().contains(TREAT_TINYINT1_AS_BOOLEAN_ENABLED))
.isEqualTo(true);
assertThat(factory.optionalOptions().contains(PARSE_ONLINE_SCHEMA_CHANGES)).isEqualTo(true);
assertThat(
factory.optionalOptions()
.contains(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST))
.isEqualTo(true);

MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
assertThat(dataSource.getSourceConfig().isTreatTinyInt1AsBoolean()).isEqualTo(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public abstract class BaseSourceConfig implements SourceConfig {
protected final boolean closeIdleReaders;
protected final boolean skipSnapshotBackfill;
protected final boolean isScanNewlyAddedTableEnabled;
protected final boolean assignEndingChunkFirst;

// --------------------------------------------------------------------------------------------
// Debezium Configurations
Expand All @@ -56,7 +57,8 @@ public BaseSourceConfig(
boolean skipSnapshotBackfill,
boolean isScanNewlyAddedTableEnabled,
Properties dbzProperties,
Configuration dbzConfiguration) {
Configuration dbzConfiguration,
boolean assignEndingChunkFirst) {
this.startupOptions = startupOptions;
this.splitSize = splitSize;
this.splitMetaGroupSize = splitMetaGroupSize;
Expand All @@ -68,6 +70,7 @@ public BaseSourceConfig(
this.isScanNewlyAddedTableEnabled = isScanNewlyAddedTableEnabled;
this.dbzProperties = dbzProperties;
this.dbzConfiguration = dbzConfiguration;
this.assignEndingChunkFirst = assignEndingChunkFirst;
}

@Override
Expand Down Expand Up @@ -115,4 +118,9 @@ public Configuration getDbzConfiguration() {
public boolean isSkipSnapshotBackfill() {
return skipSnapshotBackfill;
}

@Override
public boolean isAssignEndingChunkFirst() {
return assignEndingChunkFirst;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public JdbcSourceConfig(
int connectionPoolSize,
String chunkKeyColumn,
boolean skipSnapshotBackfill,
boolean isScanNewlyAddedTableEnabled) {
boolean isScanNewlyAddedTableEnabled,
boolean assignEndingChunkFirst) {
super(
startupOptions,
splitSize,
Expand All @@ -85,7 +86,8 @@ public JdbcSourceConfig(
skipSnapshotBackfill,
isScanNewlyAddedTableEnabled,
dbzProperties,
dbzConfiguration);
dbzConfiguration,
assignEndingChunkFirst);
this.driverClassName = driverClassName;
this.hostname = hostname;
this.port = port;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public abstract class JdbcSourceConfigFactory implements Factory<JdbcSourceConfi
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue();
protected boolean scanNewlyAddedTableEnabled =
JdbcSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue();
protected boolean assignEndingChunkFirst =
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST.defaultValue();

/** Integer port number of the database server. */
public JdbcSourceConfigFactory hostname(String hostname) {
Expand Down Expand Up @@ -252,6 +254,14 @@ public JdbcSourceConfigFactory scanNewlyAddedTableEnabled(boolean scanNewlyAdded
return this;
}

/**
* Whether to assign the ending chunk first during snapshot reading phase. Defaults to false.
*/
public JdbcSourceConfigFactory assignEndingChunkFirst(boolean assignEndingChunkFirst) {
this.assignEndingChunkFirst = assignEndingChunkFirst;
return this;
}

@Override
public abstract JdbcSourceConfig create(int subtask);
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public interface SourceConfig extends Serializable {

boolean isScanNewlyAddedTableEnabled();

boolean isAssignEndingChunkFirst();

/** Factory for the {@code SourceConfig}. */
@FunctionalInterface
interface Factory<C extends SourceConfig> extends Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,12 @@ public class SourceOptions {
.defaultValue(false)
.withDescription(
"Whether capture the newly added tables when restoring from a savepoint/checkpoint or not, by default is false.");

@Experimental
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_FIRST =
ConfigOptions.key("scan.incremental.snapshot.assign-ending-first.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to assign the ending chunk first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,13 @@ private List<ChunkRange> splitEvenlySizedChunks(
}
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
// assign ending split first, both the largest and smallest unbounded chunks are completed
// in the first two splits
if (sourceConfig.isAssignEndingChunkFirst()) {
splits.add(0, ChunkRange.of(chunkStart, null));
} else {
splits.add(ChunkRange.of(chunkStart, null));
}
return splits;
}

Expand Down Expand Up @@ -498,7 +504,7 @@ private List<ChunkRange> splitUnevenlySizedChunks(
chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize);
}
// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
splits.add(0, ChunkRange.of(chunkStart, null));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good optimization, so both the largest and smallest unbounded chunks are completed in the first two splits, and subsequent chunks are bounded size.

return splits;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public MockedSourceConfig(
connectionPoolSize,
null,
true,
isScanNewlyAddedTableEnabled);
isScanNewlyAddedTableEnabled,
false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public Db2SourceConfig(
int connectMaxRetries,
int connectionPoolSize,
String chunkKeyColumn,
boolean skipSnapshotBackfill) {
boolean skipSnapshotBackfill,
boolean assignEndingChunkFirst) {
super(
startupOptions,
databaseList,
Expand All @@ -82,7 +83,8 @@ public Db2SourceConfig(
connectionPoolSize,
chunkKeyColumn,
skipSnapshotBackfill,
false);
false,
assignEndingChunkFirst);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public Db2SourceConfig create(int subtask) {
connectMaxRetries,
connectionPoolSize,
chunkKeyColumn,
skipSnapshotBackfill);
skipSnapshotBackfill,
assignEndingChunkFirst);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,11 @@ public Collection<SnapshotSplit> split(SplitContext splitContext) {
ChunkUtils.maxUpperBoundOfId(),
null,
schema);
snapshotSplits.add(lastSplit);
if (splitContext.isAssignEndingChunkFirst()) {
snapshotSplits.add(0, lastSplit);
} else {
snapshotSplits.add(lastSplit);
}

return snapshotSplits;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,21 @@ public class SplitContext {
private final BsonDocument collectionStats;
private final int chunkSizeMB;
private final int samplesPerChunk;
private final boolean assignEndingChunkFirst;

public SplitContext(
MongoClient mongoClient,
TableId collectionId,
BsonDocument collectionStats,
int chunkSizeMB,
int samplesPerChunk) {
int samplesPerChunk,
boolean assignEndingChunkFirst) {
this.mongoClient = mongoClient;
this.collectionId = collectionId;
this.collectionStats = collectionStats;
this.chunkSizeMB = chunkSizeMB;
this.samplesPerChunk = samplesPerChunk;
this.assignEndingChunkFirst = assignEndingChunkFirst;
}

public static SplitContext of(MongoDBSourceConfig sourceConfig, TableId collectionId) {
Expand All @@ -62,7 +65,8 @@ public static SplitContext of(MongoDBSourceConfig sourceConfig, TableId collecti
collectionId,
collStats(mongoClient, collectionId),
sourceConfig.getSplitSize(),
sourceConfig.getSamplesPerChunk());
sourceConfig.getSamplesPerChunk(),
sourceConfig.isAssignEndingChunkFirst());
}

public MongoClient getMongoClient() {
Expand Down Expand Up @@ -100,4 +104,8 @@ public long getAvgObjSizeInBytes() {
public boolean isShardedCollection() {
return collectionStats.getBoolean("sharded", BsonBoolean.FALSE).getValue();
}

public boolean isAssignEndingChunkFirst() {
return assignEndingChunkFirst;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,11 @@ public Collection<SnapshotSplit> split(SplitContext splitContext) {
ChunkUtils.maxUpperBoundOfId(),
null,
schema);
snapshotSplits.add(lastSplit);
if (splitContext.isAssignEndingChunkFirst()) {
snapshotSplits.add(0, lastSplit);
} else {
snapshotSplits.add(lastSplit);
}

return snapshotSplits;
}
Expand Down
Loading
Loading