Skip to content

Commit

Permalink
aggregation does not use TopKNode
Browse files Browse the repository at this point in the history
  • Loading branch information
Beyyes committed Jan 9, 2024
1 parent e452b07 commit 100d7c4
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.FilterNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.LimitNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MergeSortNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.OffsetNode;
Expand Down Expand Up @@ -142,7 +143,8 @@ public PlanNode visitLimit(LimitNode limitNode, RewriterContext rewriterContext)
} else {
return topKNode;
}
} else if (limitNode.getChild() instanceof TransformNode) {
} else if (limitNode.getChild() instanceof TransformNode
&& !(limitNode.getChild() instanceof FilterNode)) {
TransformNode transformNode = (TransformNode) limitNode.getChild();
if (transformNode.getChild() instanceof SortNode) {
SortNode sortNode = (SortNode) transformNode.getChild();
Expand Down Expand Up @@ -217,7 +219,8 @@ public PlanNode visitOffset(OffsetNode offsetNode, RewriterContext rewriterConte
sortNode.getOutputColumnNames());
topKNode.setChildren(sortNode.getChildren());
offsetNode.setChild(topKNode);
} else if (offsetNode.getChild() instanceof TransformNode) {
} else if (offsetNode.getChild() instanceof TransformNode
&& !(offsetNode.getChild() instanceof FilterNode)) {
TransformNode transformNode = (TransformNode) offsetNode.getChild();
if (transformNode.getChild() instanceof SortNode) {
SortNode sortNode = (SortNode) transformNode.getChild();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -767,10 +767,12 @@ public LogicalPlanBuilder planDeviceView(

// 1. has LIMIT and LIMIT_VALUE is smaller than 1000000.
// 2. `order by based on time` or `order by based on expression`.
// 3. no aggregation.
// when satisfy all cases above will use ToKNode.
if (queryStatement.hasLimit()
&& queryStatement.hasOrderBy()
&& !queryStatement.isOrderByBasedOnDevice()
&& !queryStatement.isAggregationQuery()
&& limitValue <= LIMIT_VALUE_USE_TOP_K) {

TopKNode topKNode =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.apache.iotdb.db.queryengine.plan.planner.plan.node.process;

public class Main {
public static void main(String[] args) {
for (String s : dataSet) {
System.out.println(s + ";");
}
}

private static final String[] dataSet =
new String[] {
"CREATE DATABASE root.testWithoutAllNull",
"CREATE TIMESERIES root.testWithoutAllNull.d1.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
"CREATE TIMESERIES root.testWithoutAllNull.d1.s2 WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
"CREATE TIMESERIES root.testWithoutAllNull.d1.s3 WITH DATATYPE=DOUBLE, ENCODING=PLAIN",
"INSERT INTO root.testWithoutAllNull.d1(timestamp,s1) " + "values(6, 26)",
"INSERT INTO root.testWithoutAllNull.d1(timestamp,s2) " + "values(7, false)",
"INSERT INTO root.testWithoutAllNull.d1(timestamp,s1,s2) " + "values(9, 29, true)",
"flush",
"INSERT INTO root.testWithoutAllNull.d1(timestamp,s1,s2) " + "values(10, 20, true)",
"INSERT INTO root.testWithoutAllNull.d1(timestamp,s1,s2,s3) "
+ "values(11, 21, false, 11.1)",
"INSERT INTO root.testWithoutAllNull.d1(timestamp,s1,s2) " + "values(12, 22, true)",
"INSERT INTO root.testWithoutAllNull.d1(timestamp,s1,s2,s3) "
+ "values(13, 23, false, 33.3)",
"INSERT INTO root.testWithoutAllNull.d1(timestamp,s1,s3) " + "values(14, 24, 44.4)",
"INSERT INTO root.testWithoutAllNull.d1(timestamp,s2,s3) " + "values(15, true, 55.5)",
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.join.FullOuterTimeJoinNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.AlignedSeriesScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;

import org.junit.Test;
Expand Down Expand Up @@ -599,29 +600,27 @@ public void orderByTimeTest4() {
}

/*
* IdentitySinkNode-24
* └──TopK-23
* ├──SingleDeviceView-5
* │ └──AggregationNode-10
* │ ├──SeriesAggregationScanNode-11:[SeriesPath: root.sg.d1.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* │ └──ExchangeNode-17: [SourceAddress:192.0.2.1/test.2.0/20]
* ├──ExchangeNode-19: [SourceAddress:192.0.3.1/test.3.0/21]
* └──SingleDeviceView-7
* └──AggregationNode-13
* ├──SeriesAggregationScanNode-14:[SeriesPath: root.sg.d333.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* └──ExchangeNode-18: [SourceAddress:192.0.4.1/test.4.0/22]
*
* IdentitySinkNode-20
* └──SeriesAggregationScanNode-12:[SeriesPath: root.sg.d1.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
*
* IdentitySinkNode-21
* └──SingleDeviceView-6
* └──SeriesAggregationScanNode-2:[SeriesPath: root.sg.d22.s1, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
*
* IdentitySinkNode-22
* └──TopK-4
* ├──TopK-16
* │ ├──SingleDeviceView-5
* │ │ └──AggregationNode-8
* │ │ ├──SeriesAggregationScanNode-9:[SeriesPath: root.sg.d1.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* │ │ └──ExchangeNode-14: [SourceAddress:192.0.2.1/test.2.0/19]
* │ └──SingleDeviceView-7
* │ └──AggregationNode-11
* │ ├──SeriesAggregationScanNode-12:[SeriesPath: root.sg.d333.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* │ └──ExchangeNode-15: [SourceAddress:192.0.4.1/test.3.0/20]
* └──ExchangeNode-18: [SourceAddress:192.0.3.1/test.4.0/21]
*
* IdentitySinkNode-19
* └──SeriesAggregationScanNode-10:[SeriesPath: root.sg.d1.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
*
* IdentitySinkNode-20
* └──SeriesAggregationScanNode-13:[SeriesPath: root.sg.d333.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
*
* IdentitySinkNode-21
* └──TopK-17
* └──SingleDeviceView-6
* └──SeriesAggregationScanNode-2:[SeriesPath: root.sg.d22.s1, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
* └──SeriesAggregationScanNode-15:[SeriesPath: root.sg.d333.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
*/
@Test
public void orderByTimeTest5() {
Expand All @@ -637,19 +636,19 @@ public void orderByTimeTest5() {
firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
firstFiTopNode = firstFiRoot.getChildren().get(0);
assertTrue(firstFiTopNode instanceof TopKNode);
assertEquals(2, firstFiTopNode.getChildren().size());
assertTrue(firstFiTopNode.getChildren().get(0) instanceof TopKNode);
assertTrue(
firstFiTopNode.getChildren().get(0).getChildren().get(0) instanceof SingleDeviceViewNode);
assertEquals(3, firstFiTopNode.getChildren().size());
assertTrue(firstFiTopNode.getChildren().get(0) instanceof SingleDeviceViewNode);
assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode);
assertTrue(firstFiTopNode.getChildren().get(2) instanceof SingleDeviceViewNode);
assertTrue(
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
instanceof AggregationNode);
plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0)
instanceof SeriesAggregationScanNode);
assertTrue(
firstFiTopNode.getChildren().get(0).getChildren().get(1) instanceof SingleDeviceViewNode);
plan.getInstances().get(2).getFragment().getPlanNodeTree().getChildren().get(0)
instanceof SingleDeviceViewNode);
assertTrue(
firstFiTopNode.getChildren().get(0).getChildren().get(1).getChildren().get(0)
instanceof AggregationNode);
assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode);
plan.getInstances().get(3).getFragment().getPlanNodeTree().getChildren().get(0)
instanceof SeriesAggregationScanNode);

// aggregation + order by time + group by time, has LIMIT
// SingleDeviceViewNode + TopKNode
Expand All @@ -663,19 +662,19 @@ public void orderByTimeTest5() {
firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
firstFiTopNode = firstFiRoot.getChildren().get(0);
assertTrue(firstFiTopNode instanceof TopKNode);
assertEquals(2, firstFiTopNode.getChildren().size());
assertTrue(firstFiTopNode.getChildren().get(0) instanceof TopKNode);
assertTrue(
firstFiTopNode.getChildren().get(0).getChildren().get(0) instanceof SingleDeviceViewNode);
assertEquals(3, firstFiTopNode.getChildren().size());
assertTrue(firstFiTopNode.getChildren().get(0) instanceof SingleDeviceViewNode);
assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode);
assertTrue(firstFiTopNode.getChildren().get(2) instanceof SingleDeviceViewNode);
assertTrue(
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
instanceof AggregationNode);
plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0)
instanceof SeriesAggregationScanNode);
assertTrue(
firstFiTopNode.getChildren().get(0).getChildren().get(1) instanceof SingleDeviceViewNode);
plan.getInstances().get(2).getFragment().getPlanNodeTree().getChildren().get(0)
instanceof SingleDeviceViewNode);
assertTrue(
firstFiTopNode.getChildren().get(0).getChildren().get(1).getChildren().get(0)
instanceof AggregationNode);
assertTrue(firstFiTopNode.getChildren().get(1) instanceof ExchangeNode);
plan.getInstances().get(3).getFragment().getPlanNodeTree().getChildren().get(0)
instanceof SeriesAggregationScanNode);
}

@Test
Expand Down Expand Up @@ -917,33 +916,34 @@ public void orderByExpressionTest3() {
}

/*
* IdentitySinkNode-34
* └──TopK-10
* └──DeviceView-12
* ├──AggregationNode-17
* │ ├──SeriesAggregationScanNode-13:[SeriesPath: root.sg.d1.s2, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* │ ├──SeriesAggregationScanNode-15:[SeriesPath: root.sg.d1.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* │ └──ExchangeNode-28: [SourceAddress:192.0.2.1/test.2.0/31]
* ├──ExchangeNode-30: [SourceAddress:192.0.3.1/test.3.0/32]
* └──AggregationNode-26
* ├──SeriesAggregationScanNode-22:[SeriesPath: root.sg.d333.s2, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* ├──SeriesAggregationScanNode-24:[SeriesPath: root.sg.d333.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* └──ExchangeNode-29: [SourceAddress:192.0.4.1/test.4.0/33]
*
* IdentitySinkNode-31
* └──HorizontallyConcatNode-18
* ├──SeriesAggregationScanNode-14:[SeriesPath: root.sg.d1.s2, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
* └──SeriesAggregationScanNode-16:[SeriesPath: root.sg.d1.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
* IdentitySinkNode-38
* └──TransformNode-12
* └──TopK-37
* └──DeviceView-14
* ├──AggregationNode-19
* │ ├──SeriesAggregationScanNode-15:[SeriesPath: root.sg.d1.s2, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* │ ├──SeriesAggregationScanNode-17:[SeriesPath: root.sg.d1.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* │ └──ExchangeNode-31: [SourceAddress:192.0.2.1/test.2.0/34]
* ├──ExchangeNode-33: [SourceAddress:192.0.3.1/test.3.0/35]
* └──AggregationNode-28
* ├──SeriesAggregationScanNode-24:[SeriesPath: root.sg.d333.s2, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* ├──SeriesAggregationScanNode-26:[SeriesPath: root.sg.d333.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* └──ExchangeNode-32: [SourceAddress:192.0.4.1/test.4.0/36]
*
* IdentitySinkNode-32
* └──HorizontallyConcatNode-21
* ├──SeriesAggregationScanNode-19:[SeriesPath: root.sg.d22.s1, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
* └──SeriesAggregationScanNode-20:[SeriesPath: root.sg.d22.s2, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
* IdentitySinkNode-34
* └──HorizontallyConcatNode-20
* ├──SeriesAggregationScanNode-16:[SeriesPath: root.sg.d1.s2, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
* └──SeriesAggregationScanNode-18:[SeriesPath: root.sg.d1.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
*
* IdentitySinkNode-33
* └──HorizontallyConcatNode-27
* ├──SeriesAggregationScanNode-23:[SeriesPath: root.sg.d333.s2, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
* └──SeriesAggregationScanNode-25:[SeriesPath: root.sg.d333.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
* IdentitySinkNode-35
* └──HorizontallyConcatNode-23
* ├──SeriesAggregationScanNode-21:[SeriesPath: root.sg.d22.s1, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
* └──SeriesAggregationScanNode-22:[SeriesPath: root.sg.d22.s2, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
*
* IdentitySinkNode-36
* └──HorizontallyConcatNode-29
* ├──SeriesAggregationScanNode-25:[SeriesPath: root.sg.d333.s2, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
* └──SeriesAggregationScanNode-27:[SeriesPath: root.sg.d333.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
*/
@Test
public void orderByExpressionTest4() {
Expand All @@ -958,11 +958,18 @@ public void orderByExpressionTest4() {
assertEquals(4, plan.getInstances().size());
firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
firstFiTopNode = firstFiRoot.getChildren().get(0);
assertTrue(firstFiTopNode instanceof TopKNode);
assertTrue(firstFiTopNode.getChildren().get(0) instanceof DeviceViewNode);
assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(0) instanceof AggregationNode);
assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(1) instanceof ExchangeNode);
assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(2) instanceof AggregationNode);
assertTrue(firstFiTopNode instanceof TransformNode);
assertTrue(firstFiTopNode.getChildren().get(0) instanceof TopKNode);
assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(0) instanceof DeviceViewNode);
assertTrue(
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
instanceof AggregationNode);
assertTrue(
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(1)
instanceof ExchangeNode);
assertTrue(
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(2)
instanceof AggregationNode);
for (int i = 1; i < 4; i++) {
assertTrue(
plan.getInstances().get(i).getFragment().getPlanNodeTree().getChildren().get(0)
Expand Down

0 comments on commit 100d7c4

Please sign in to comment.