Skip to content

Commit

Permalink
[AMORO-2253] Mixed Format optimized-sequence should not contains th…
Browse files Browse the repository at this point in the history
…e skipped partitions (#2249)

(cherry picked from commit 4f70e5e)
  • Loading branch information
wangtaohz authored and shidayang committed Nov 6, 2023
1 parent 25237fb commit 2476e1d
Showing 1 changed file with 10 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class OptimizingPlanner extends OptimizingEvaluator {
private final PartitionPlannerFactory partitionPlannerFactory;
private List<TaskDescriptor> tasks;

private List<AbstractPartitionPlan> actualPartitionPlans;

public OptimizingPlanner(TableRuntime tableRuntime, ArcticTable table, double availableCore) {
super(tableRuntime, table);
this.partitionFilter =
Expand All @@ -69,17 +71,17 @@ protected PartitionEvaluator buildEvaluator(String partitionPath) {
}

public Map<String, Long> getFromSequence() {
return partitionPlanMap.entrySet().stream()
return actualPartitionPlans.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, e -> ((AbstractPartitionPlan) e.getValue()).getFromSequence()));
AbstractPartitionPlan::getPartition, AbstractPartitionPlan::getFromSequence));
}

public Map<String, Long> getToSequence() {
return partitionPlanMap.entrySet().stream()
return actualPartitionPlans.stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, e -> ((AbstractPartitionPlan) e.getValue()).getToSequence()));
AbstractPartitionPlan::getPartition, AbstractPartitionPlan::getToSequence));
}

@Override
Expand Down Expand Up @@ -126,10 +128,10 @@ public List<TaskDescriptor> planTasks() {
evaluators.sort(Comparator.comparing(PartitionEvaluator::getWeight, Comparator.reverseOrder()));

double maxInputSize = MAX_INPUT_FILE_SIZE_PER_THREAD * availableCore;
List<PartitionEvaluator> inputPartitions = Lists.newArrayList();
actualPartitionPlans = Lists.newArrayList();
long actualInputSize = 0;
for (PartitionEvaluator evaluator : evaluators) {
inputPartitions.add(evaluator);
actualPartitionPlans.add((AbstractPartitionPlan) evaluator);
actualInputSize += evaluator.getCost();
if (actualInputSize > maxInputSize) {
break;
Expand All @@ -138,9 +140,8 @@ public List<TaskDescriptor> planTasks() {

double avgThreadCost = actualInputSize / availableCore;
List<TaskDescriptor> tasks = Lists.newArrayList();
for (PartitionEvaluator evaluator : inputPartitions) {
tasks.addAll(
((AbstractPartitionPlan) evaluator).splitTasks((int) (actualInputSize / avgThreadCost)));
for (AbstractPartitionPlan partitionPlan : actualPartitionPlans) {
tasks.addAll(partitionPlan.splitTasks((int) (actualInputSize / avgThreadCost)));
}
if (!tasks.isEmpty()) {
if (evaluators.stream()
Expand Down

0 comments on commit 2476e1d

Please sign in to comment.