Skip to content

Commit

Permalink
[multistage][refactor] clean up planner (apache#12070)
Browse files Browse the repository at this point in the history
* [cleaup] remove plan fragment metadata, it is same concept as dispatchablePlanMetadata
* [cleanup] clean separate component logics

---------

Co-authored-by: Rong Rong <rongr@startree.ai>
  • Loading branch information
walterddr and Rong Rong authored Nov 29, 2023
1 parent c57117a commit b9ed378
Show file tree
Hide file tree
Showing 21 changed files with 32 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.catalog.PinotCatalog;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.DispatchableSubPlan;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.routing.WorkerManager;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.query.type.TypeFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@
import org.apache.calcite.tools.RelBuilder;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.query.context.PlannerContext;
import org.apache.pinot.query.planner.DispatchableSubPlan;
import org.apache.pinot.query.planner.PhysicalExplainPlanVisitor;
import org.apache.pinot.query.planner.PlannerUtils;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.SubPlan;
import org.apache.pinot.query.planner.explain.PhysicalExplainPlanVisitor;
import org.apache.pinot.query.planner.logical.PinotLogicalQueryPlanner;
import org.apache.pinot.query.planner.logical.RelToPlanNodeConverter;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.planner.physical.PinotDispatchPlanner;
import org.apache.pinot.query.routing.WorkerManager;
import org.apache.pinot.query.type.TypeFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,12 @@ public class PlanFragment {

private final int _fragmentId;
private final PlanNode _fragmentRoot;
private final PlanFragmentMetadata _fragmentMetadata;

private final List<PlanFragment> _children;

public PlanFragment(int fragmentId, PlanNode fragmentRoot, PlanFragmentMetadata fragmentMetadata,
List<PlanFragment> children) {
public PlanFragment(int fragmentId, PlanNode fragmentRoot, List<PlanFragment> children) {
_fragmentId = fragmentId;
_fragmentRoot = fragmentRoot;
_fragmentMetadata = fragmentMetadata;
_children = children;
}

Expand All @@ -51,10 +48,6 @@ public PlanNode getFragmentRoot() {
return _fragmentRoot;
}

public PlanFragmentMetadata getFragmentMetadata() {
return _fragmentMetadata;
}

public List<PlanFragment> getChildren() {
return _children;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.query.planner;
package org.apache.pinot.query.planner.explain;

import java.util.ArrayList;
import java.util.Comparator;
Expand All @@ -25,6 +25,8 @@
import java.util.Map;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.planner.plannode.AggregateNode;
import org.apache.pinot.query.planner.plannode.ExchangeNode;
import org.apache.pinot.query.planner.plannode.FilterNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.PlanFragmentMetadata;
import org.apache.pinot.query.planner.QueryPlan;
import org.apache.pinot.query.planner.QueryPlanMetadata;
import org.apache.pinot.query.planner.SubPlan;
Expand Down Expand Up @@ -102,8 +101,7 @@ public SubPlan makePlan(QueryPlan queryPlan) {
RelDistribution.Type.BROADCAST_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null,
false, false);
subPlanRootSenderNode.addInput(subPlanRoot);
PlanFragment planFragment1 =
new PlanFragment(1, subPlanRootSenderNode, new PlanFragmentMetadata(), new ArrayList<>());
PlanFragment planFragment1 = new PlanFragment(1, subPlanRootSenderNode, new ArrayList<>());
planFragmentMap.put(1, planFragment1);
for (Int2ObjectMap.Entry<IntList> entry : childPlanFragmentIdsMap.int2ObjectEntrySet()) {
PlanFragment planFragment = planFragmentMap.get(entry.getIntKey());
Expand All @@ -117,8 +115,7 @@ public SubPlan makePlan(QueryPlan queryPlan) {
new MailboxReceiveNode(0, subPlanRoot.getDataSchema(), subPlanRoot.getPlanFragmentId(),
RelDistribution.Type.BROADCAST_DISTRIBUTED, PinotRelExchangeType.getDefaultExchangeType(), null, null,
false, false, subPlanRootSenderNode);
PlanFragment rootPlanFragment =
new PlanFragment(0, rootReceiveNode, new PlanFragmentMetadata(), Collections.singletonList(planFragment1));
PlanFragment rootPlanFragment = new PlanFragment(0, rootReceiveNode, Collections.singletonList(planFragment1));
SubPlan subPlan = new SubPlan(rootPlanFragment, subPlanContext._subPlanIdToMetadataMap.get(0), new ArrayList<>());
subPlanMap.put(subPlanId, subPlan);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.logical.PinotRelExchangeType;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.PlanFragmentMetadata;
import org.apache.pinot.query.planner.SubPlan;
import org.apache.pinot.query.planner.plannode.AggregateNode;
import org.apache.pinot.query.planner.plannode.ExchangeNode;
Expand Down Expand Up @@ -163,7 +162,7 @@ public PlanNode visitExchange(ExchangeNode node, Context context) {
node.isPartitioned());
mailboxSendNode.addInput(nextPlanFragmentRoot);
_planFragmentMap.put(senderPlanFragmentId,
new PlanFragment(senderPlanFragmentId, mailboxSendNode, new PlanFragmentMetadata(), new ArrayList<>()));
new PlanFragment(senderPlanFragmentId, mailboxSendNode, new ArrayList<>()));

// Return the MailboxReceiveNode as the leave node of the current PlanFragment.
return new MailboxReceiveNode(receiverPlanFragmentId, nextPlanFragmentRoot.getDataSchema(), senderPlanFragmentId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.Set;
import org.apache.calcite.util.Pair;
import org.apache.pinot.query.context.PlannerContext;
import org.apache.pinot.query.planner.DispatchablePlanFragment;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.MailboxMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.query.planner;
package org.apache.pinot.query.planner.physical;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.pinot.core.routing.TimeBoundaryInfo;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.query.routing.WorkerMetadata;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.query.planner;
package org.apache.pinot.query.planner.physical;

import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.Set;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.query.context.PlannerContext;
import org.apache.pinot.query.planner.DispatchableSubPlan;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.SubPlan;
import org.apache.pinot.query.planner.physical.colocated.GreedyShuffleRewriteVisitor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
*/
package org.apache.pinot.query.planner.plannode;

import org.apache.pinot.query.planner.PhysicalExplainPlanVisitor;
import org.apache.pinot.query.planner.explain.PhysicalExplainPlanVisitor;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;


/**
Expand All @@ -27,7 +28,7 @@
* enforced traversal order, and should be implemented by subclasses.
*
* <p>It is recommended that implementors use private constructors and static methods to access main
* functionality (see {@link PhysicalExplainPlanVisitor#explain(org.apache.pinot.query.planner.DispatchableSubPlan)}
* functionality (see {@link PhysicalExplainPlanVisitor#explain(DispatchableSubPlan)}
* as an example of a usage of this pattern.
*
* @param <T> the return type for all visitsPlanNodeVisitor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.calcite.rel.RelDistribution;
import org.apache.pinot.query.planner.DispatchablePlanFragment;
import org.apache.pinot.query.planner.DispatchableSubPlan;
import org.apache.pinot.query.planner.PhysicalExplainPlanVisitor;
import org.apache.pinot.query.planner.PlannerUtils;
import org.apache.pinot.query.planner.explain.PhysicalExplainPlanVisitor;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
import org.apache.pinot.query.planner.plannode.AggregateNode;
import org.apache.pinot.query.planner.plannode.FilterNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import java.util.Map;
import org.apache.pinot.common.proto.Plan;
import org.apache.pinot.query.QueryEnvironmentTestBase;
import org.apache.pinot.query.planner.DispatchablePlanFragment;
import org.apache.pinot.query.planner.DispatchableSubPlan;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.planner.serde.ProtoProperties;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.query.QueryEnvironmentTestBase;
import org.apache.pinot.query.planner.DispatchableSubPlan;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.query.planner.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.operator.OperatorStats;
import org.apache.pinot.spi.utils.JsonUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.query.planner.DispatchablePlanFragment;
import org.apache.pinot.query.planner.DispatchableSubPlan;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.planner.plannode.AbstractPlanNode;
import org.apache.pinot.query.planner.plannode.StageNodeSerDeUtils;
import org.apache.pinot.query.routing.MailboxMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@
import org.apache.pinot.core.util.DataBlockExtractUtils;
import org.apache.pinot.core.util.trace.TracedThreadFactory;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.DispatchablePlanFragment;
import org.apache.pinot.query.planner.DispatchableSubPlan;
import org.apache.pinot.query.planner.PlanFragment;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.planner.plannode.MailboxReceiveNode;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.QueryServerInstance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@
import org.apache.pinot.query.QueryServerEnclosure;
import org.apache.pinot.query.QueryTestSet;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.DispatchablePlanFragment;
import org.apache.pinot.query.planner.DispatchableSubPlan;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.query.routing.VirtualServerAddress;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.pinot.query.QueryEnvironmentTestBase;
import org.apache.pinot.query.QueryTestSet;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.DispatchableSubPlan;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.runtime.QueryRunner;
import org.apache.pinot.query.service.server.QueryServer;
import org.apache.pinot.query.testutils.QueryTestUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import org.apache.pinot.query.QueryEnvironment;
import org.apache.pinot.query.QueryEnvironmentTestBase;
import org.apache.pinot.query.QueryTestSet;
import org.apache.pinot.query.planner.DispatchablePlanFragment;
import org.apache.pinot.query.planner.DispatchableSubPlan;
import org.apache.pinot.query.planner.physical.DispatchablePlanFragment;
import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.query.routing.WorkerMetadata;
Expand Down

0 comments on commit b9ed378

Please sign in to comment.