diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingPlanner.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingPlanner.java index 2fbad3effb..5a14d6e5d5 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingPlanner.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/plan/OptimizingPlanner.java @@ -51,6 +51,8 @@ public class OptimizingPlanner extends OptimizingEvaluator { private final PartitionPlannerFactory partitionPlannerFactory; private List tasks; + private List actualPartitionPlans; + public OptimizingPlanner(TableRuntime tableRuntime, ArcticTable table, double availableCore) { super(tableRuntime, table); this.partitionFilter = @@ -69,17 +71,17 @@ protected PartitionEvaluator buildEvaluator(String partitionPath) { } public Map getFromSequence() { - return partitionPlanMap.entrySet().stream() + return actualPartitionPlans.stream() .collect( Collectors.toMap( - Map.Entry::getKey, e -> ((AbstractPartitionPlan) e.getValue()).getFromSequence())); + AbstractPartitionPlan::getPartition, AbstractPartitionPlan::getFromSequence)); } public Map getToSequence() { - return partitionPlanMap.entrySet().stream() + return actualPartitionPlans.stream() .collect( Collectors.toMap( - Map.Entry::getKey, e -> ((AbstractPartitionPlan) e.getValue()).getToSequence())); + AbstractPartitionPlan::getPartition, AbstractPartitionPlan::getToSequence)); } @Override @@ -126,10 +128,10 @@ public List planTasks() { evaluators.sort(Comparator.comparing(PartitionEvaluator::getWeight, Comparator.reverseOrder())); double maxInputSize = MAX_INPUT_FILE_SIZE_PER_THREAD * availableCore; - List inputPartitions = Lists.newArrayList(); + actualPartitionPlans = Lists.newArrayList(); long actualInputSize = 0; for (PartitionEvaluator evaluator : evaluators) { - inputPartitions.add(evaluator); + actualPartitionPlans.add((AbstractPartitionPlan) evaluator); actualInputSize += evaluator.getCost(); if (actualInputSize > maxInputSize) { break; @@ -138,9 +140,8 @@ public List planTasks() { double avgThreadCost = actualInputSize / availableCore; List tasks = Lists.newArrayList(); - for (PartitionEvaluator evaluator : inputPartitions) { - tasks.addAll( - ((AbstractPartitionPlan) evaluator).splitTasks((int) (actualInputSize / avgThreadCost))); + for (AbstractPartitionPlan partitionPlan : actualPartitionPlans) { + tasks.addAll(partitionPlan.splitTasks((int) (actualInputSize / avgThreadCost))); } if (!tasks.isEmpty()) { if (evaluators.stream()