Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-3239] Fix stack overflow caused by reading too many partitions in the filter #3240

Merged
merged 12 commits into from
Oct 16, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ public class AmoroManagementConf {
.defaultValue(60000L)
.withDescription("Interval for refreshing table metadata.");

public static final ConfigOption<Integer> REFRESH_MAX_PENDING_PARTITIONS =
ConfigOptions.key("refresh-tables.max-pending-partition-count")
.intType()
.defaultValue(100)
.withDescription("Filters will not be used beyond that number of partitions");

public static final ConfigOption<Long> BLOCKER_TIMEOUT =
ConfigOptions.key("blocker.timeout")
.longType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,18 @@ public class OptimizingEvaluator {
protected final MixedTable mixedTable;
protected final TableRuntime tableRuntime;
protected final TableSnapshot currentSnapshot;
protected final int maxPendingPartitions;
protected boolean isInitialized = false;

protected Map<String, PartitionEvaluator> needOptimizingPlanMap = Maps.newHashMap();
protected Map<String, PartitionEvaluator> partitionPlanMap = Maps.newHashMap();

public OptimizingEvaluator(TableRuntime tableRuntime, MixedTable table) {
public OptimizingEvaluator(
TableRuntime tableRuntime, MixedTable table, int maxPendingPartitions) {
this.tableRuntime = tableRuntime;
this.mixedTable = table;
this.currentSnapshot = IcebergTableUtil.getSnapshot(table, tableRuntime);
this.maxPendingPartitions = maxPendingPartitions;
}

public TableRuntime getTableRuntime() {
Expand Down Expand Up @@ -137,6 +140,7 @@ private void initPartitionPlans(TableFileScanHelper tableFileScanHelper) {
needOptimizingPlanMap.putAll(
partitionPlanMap.entrySet().stream()
.filter(entry -> entry.getValue().isNecessary())
.limit(maxPendingPartitions)
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public OptimizingPlanner(
MixedTable table,
double availableCore,
long maxInputSizePerThread) {
super(tableRuntime, table);
super(tableRuntime, table, Integer.MAX_VALUE);
this.partitionFilter =
tableRuntime.getPendingInput() == null
? Expressions.alwaysTrue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public void setup(TableManager tableManager, Configurations conf) {
new TableRuntimeRefreshExecutor(
tableManager,
conf.getInteger(AmoroManagementConf.REFRESH_TABLES_THREAD_COUNT),
conf.getLong(AmoroManagementConf.REFRESH_TABLES_INTERVAL));
conf.getLong(AmoroManagementConf.REFRESH_TABLES_INTERVAL),
conf.getInteger(AmoroManagementConf.REFRESH_MAX_PENDING_PARTITIONS));
if (conf.getBoolean(AmoroManagementConf.AUTO_CREATE_TAGS_ENABLED)) {
this.tagsAutoCreatingExecutor =
new TagsAutoCreatingExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ public class TableRuntimeRefreshExecutor extends BaseTableExecutor {

// 1 minutes
private final long interval;
private final int maxPendingPartitions;

public TableRuntimeRefreshExecutor(TableManager tableRuntimes, int poolSize, long interval) {
public TableRuntimeRefreshExecutor(
TableManager tableRuntimes, int poolSize, long interval, int maxPendingPartitions) {
super(tableRuntimes, poolSize);
this.interval = interval;
this.maxPendingPartitions = maxPendingPartitions;
}

@Override
Expand All @@ -48,7 +51,8 @@ protected long getNextExecutingTime(TableRuntime tableRuntime) {

private void tryEvaluatingPendingInput(TableRuntime tableRuntime, MixedTable table) {
if (tableRuntime.isOptimizingEnabled() && !tableRuntime.getOptimizingStatus().isProcessing()) {
OptimizingEvaluator evaluator = new OptimizingEvaluator(tableRuntime, table);
OptimizingEvaluator evaluator =
new OptimizingEvaluator(tableRuntime, table, maxPendingPartitions);
if (evaluator.isNecessary()) {
OptimizingEvaluator.PendingInput pendingInput = evaluator.getOptimizingPendingInput();
logger.debug(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ protected void reboot() throws InterruptedException {
private class TableRuntimeRefresher extends TableRuntimeRefreshExecutor {

public TableRuntimeRefresher() {
super(tableService(), 1, Integer.MAX_VALUE);
super(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE);
}

void refreshPending() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ private void appendData(UnkeyedTable table, int id) {

void refreshPending() {
TableRuntimeRefreshExecutor refresher =
new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE);
new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE);
refresher.execute(tableService().getRuntime(serverTableIdentifier().getId()));
refresher.dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void testFragmentFiles() {
}

protected OptimizingEvaluator buildOptimizingEvaluator() {
return new OptimizingEvaluator(getTableRuntime(), getMixedTable());
return new OptimizingEvaluator(getTableRuntime(), getMixedTable(), 100);
}

protected void assertEmptyInput(OptimizingEvaluator.PendingInput input) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ private void appendPosDelete(UnkeyedTable table) {

void refreshPending() {
TableRuntimeRefreshExecutor refresher =
new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE);
new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE);
refresher.execute(tableService().getRuntime(serverTableIdentifier().getId()));
refresher.dispose();
}
Expand Down
1 change: 1 addition & 0 deletions dist/src/main/amoro-bin/conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ ams:
refresh-tables:
thread-count: 10
interval: 60000 # 1min
max-pending-partition-count: 100 # default 100

self-optimizing:
commit-thread-count: 10
Expand Down
Loading