Skip to content

Commit

Permalink
release memory after pushing down the predicate & release memory befo…
Browse files Browse the repository at this point in the history
…re retry of QueryExectuion & reserverMemoryImmediately after dispatching the FIs for the last batch of memory occupied by the FE.
  • Loading branch information
lancelly committed May 23, 2024
1 parent dfa3199 commit 50af56c
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public MPPQueryContext(

public void prepareForRetry() {
this.initResultNodeContext();
this.releaseMemoryForFrontEnd();
}

private void initResultNodeContext() {
Expand Down Expand Up @@ -310,17 +311,25 @@ public void setLogicalOptimizationCost(long logicalOptimizeCost) {
* single-threaded manner.
*/
public void reserveMemoryForFrontEnd(final long bytes) {
bytesToBeReservedForFrontEnd += bytes;
if (bytesToBeReservedForFrontEnd >= MEMORY_BATCH_THRESHOLD) {
LOCAL_EXECUTION_PLANNER.reserveMemoryForQueryFrontEnd(
bytesToBeReservedForFrontEnd, reservedBytesInTotalForFrontEnd, queryId.getId());
reservedBytesInTotalForFrontEnd += bytesToBeReservedForFrontEnd;
bytesToBeReservedForFrontEnd = 0;
this.bytesToBeReservedForFrontEnd += bytes;
if (this.bytesToBeReservedForFrontEnd >= MEMORY_BATCH_THRESHOLD) {
reserveMemoryForFrontEndImmediately();
}
}

public void reserveMemoryForFrontEndImmediately() {
LOCAL_EXECUTION_PLANNER.reserveMemoryForQueryFrontEnd(
bytesToBeReservedForFrontEnd, reservedBytesInTotalForFrontEnd, queryId.getId());
this.reservedBytesInTotalForFrontEnd += bytesToBeReservedForFrontEnd;
this.bytesToBeReservedForFrontEnd = 0;
}

public void releaseMemoryForFrontEnd() {
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(reservedBytesInTotalForFrontEnd);
reservedBytesInTotalForFrontEnd = 0;
releaseMemoryForFrontEnd(reservedBytesInTotalForFrontEnd);
}

public void releaseMemoryForFrontEnd(final long bytes) {
LOCAL_EXECUTION_PLANNER.releaseToFreeMemoryForOperators(bytes);
reservedBytesInTotalForFrontEnd -= bytes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ public void start() {
PERFORMANCE_OVERVIEW_METRICS.recordPlanCost(System.nanoTime() - startTime);
schedule();

// The last batch of memory reserved by the front end
context.reserveMemoryForFrontEndImmediately();

// friendly for gc
logicalPlan.clearUselessMemory();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,28 @@ public PlanNode visitRawDataAggregation(RawDataAggregationNode node, RewriterCon

PlanNode resultNode = convergeWithTimeJoin(sourceNodeList, node.getScanOrder(), context);
resultNode = planProject(resultNode, node, context);

// After pushing down the predicate, the original scan nodes are no longer needed, we should
// release the memory that they occupied.
releaseMemoryForOldScanNodes(node, context.getContext());
return resultNode;
}
// cannot push down
return node;
}

private void releaseMemoryForOldScanNodes(PlanNode node, MPPQueryContext queryContext) {
if (node == null) {
return;
}
if (node instanceof SeriesScanSourceNode) {
SeriesScanSourceNode scanNode = (SeriesScanSourceNode) node;
queryContext.releaseMemoryForFrontEnd(scanNode.ramBytesUsed());
} else if (node instanceof FullOuterTimeJoinNode) {
node.getChildren().forEach(child -> releaseMemoryForOldScanNodes(child, queryContext));
}
}

private void createAggregationDescriptor(
FunctionExpression sourceExpression,
AggregationStep curStep,
Expand Down

0 comments on commit 50af56c

Please sign in to comment.