Skip to content

Commit

Permalink
aggregation align by device order by use TopKNode
Browse files Browse the repository at this point in the history
  • Loading branch information
Beyyes committed Jan 9, 2024
1 parent 9959ed1 commit e452b07
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -765,9 +765,52 @@ public LogicalPlanBuilder planDeviceView(
? queryStatement.getRowOffset() + queryStatement.getRowLimit()
: queryStatement.getRowLimit();

if (queryStatement.hasOrderBy() && !queryStatement.isOrderByBasedOnDevice()) {
// only one device, use DeviceViewNode
// 1. has LIMIT and LIMIT_VALUE is smaller than 1000000.
// 2. `order by based on time` or `order by based on expression`.
// when satisfy all cases above will use ToKNode.
if (queryStatement.hasLimit()
&& queryStatement.hasOrderBy()
&& !queryStatement.isOrderByBasedOnDevice()
&& limitValue <= LIMIT_VALUE_USE_TOP_K) {

TopKNode topKNode =
new TopKNode(
context.getQueryId().genPlanNodeId(),
(int) limitValue,
orderByParameter,
outputColumnNames);

// if value filter exists, need add a LIMIT-NODE as the child node of TopKNode
long valueFilterLimit = queryStatement.hasWhere() ? limitValue : -1;

// only order by based on time, use TopKNode + SingleDeviceViewNode
if (queryStatement.isOrderByBasedOnTime() && !queryStatement.hasOrderByExpression()) {
addSingleDeviceViewNodes(
topKNode,
deviceNameToSourceNodesMap,
outputColumnNames,
deviceToMeasurementIndexesMap,
valueFilterLimit);
} else {
// has order by expression, use TopKNode + DeviceViewNode
topKNode.addChild(
addDeviceViewNode(
orderByParameter,
outputColumnNames,
deviceToMeasurementIndexesMap,
deviceNameToSourceNodesMap,
valueFilterLimit));
}

analysis.setUseTopKNode();
this.root = topKNode;
}
// 1. `order by based on time` + `no order by expression`.
// 2. no LIMIT or LIMIT_VALUE is larger than 1000000.
// when satisfy all above requirements use MergeSortNode.
else if (queryStatement.isOrderByBasedOnTime() && !queryStatement.hasOrderByExpression()) {
if (deviceNameToSourceNodesMap.size() == 1) {
// only one device, use DeviceViewNode, no need MergeSortNode
this.root =
addDeviceViewNode(
orderByParameter,
Expand All @@ -776,50 +819,20 @@ public LogicalPlanBuilder planDeviceView(
deviceNameToSourceNodesMap,
-1);
} else {
// 1. `order by based on time` or `order by based on expression`, i.e. not order by device.
// 2. limitValue is greater than 0 than is less than LIMIT_VALUE_USE_TOP_K.
// when satisfy all above requirements use ToKNode.
MultiChildProcessNode processNode;
if (limitValue > 0 && limitValue < LIMIT_VALUE_USE_TOP_K) {
analysis.setUseTopKNode();
processNode =
new TopKNode(
context.getQueryId().genPlanNodeId(),
(int) limitValue,
orderByParameter,
outputColumnNames);
} else {
processNode =
new MergeSortNode(
context.getQueryId().genPlanNodeId(), orderByParameter, outputColumnNames);
}

// if value filter exists, need add a LIMIT-NODE as the child node of processNode
long valueFilterLimit = queryStatement.hasWhere() ? limitValue : -1;

// order by based on time, use SingleDeviceViewNode
if (queryStatement.isOrderByBasedOnTime() && !queryStatement.hasOrderByExpression()) {
addSingleDeviceViewNodes(
processNode,
deviceNameToSourceNodesMap,
outputColumnNames,
deviceToMeasurementIndexesMap,
valueFilterLimit);
} else {
// order by based on expression, use DeviceViewNode
processNode.addChild(
addDeviceViewNode(
orderByParameter,
outputColumnNames,
deviceToMeasurementIndexesMap,
deviceNameToSourceNodesMap,
valueFilterLimit));
}

this.root = processNode;
// otherwise use MergeSortNode + SingleDeviceViewNode
MergeSortNode mergeSortNode =
new MergeSortNode(
context.getQueryId().genPlanNodeId(), orderByParameter, outputColumnNames);
addSingleDeviceViewNodes(
mergeSortNode,
deviceNameToSourceNodesMap,
outputColumnNames,
deviceToMeasurementIndexesMap,
-1);
this.root = mergeSortNode;
}
} else {
// order by based on device or has aggregation, use DeviceViewNode
// order by based on device, use DeviceViewNode
this.root =
addDeviceViewNode(
orderByParameter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,38 +712,41 @@ public void orderByTimeWithOffsetTest() {
}

/*
* IdentitySinkNode-40
* └──TransformNode-13
* └──SortNode-12
* └──MergeSort-14
* ├──DeviceView-21
* │ ├──FullOuterTimeJoinNode-17
* │ │ ├──SeriesScanNode-15:[SeriesPath: root.sg.d1.s1, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* │ │ └──SeriesScanNode-16:[SeriesPath: root.sg.d1.s2, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* │ └──FullOuterTimeJoinNode-20
* │ ├──SeriesScanNode-18:[SeriesPath: root.sg.d333.s1, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* │ └──SeriesScanNode-19:[SeriesPath: root.sg.d333.s2, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* ├──ExchangeNode-34: [SourceAddress:192.0.3.1/test.2.0/37]
* ├──ExchangeNode-35: [SourceAddress:192.0.2.1/test.3.0/38]
* └──ExchangeNode-36: [SourceAddress:192.0.4.1/test.4.0/39]
*
* IdentitySinkNode-37
* └──DeviceView-25
* └──FullOuterTimeJoinNode-24
* ├──SeriesScanNode-22:[SeriesPath: root.sg.d22.s1, DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
* └──SeriesScanNode-23:[SeriesPath: root.sg.d22.s2, DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
* IdentitySinkNode-43
* └──TransformNode-12
* └──MergeSort-32
* ├──SortNode-33
* │ └──DeviceView-19
* │ ├──FullOuterTimeJoinNode-15
* │ │ ├──SeriesScanNode-13:[SeriesPath: root.sg.d1.s1, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* │ │ └──SeriesScanNode-14:[SeriesPath: root.sg.d1.s2, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* │ └──FullOuterTimeJoinNode-18
* │ ├──SeriesScanNode-16:[SeriesPath: root.sg.d333.s1, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* │ └──SeriesScanNode-17:[SeriesPath: root.sg.d333.s2, DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* ├──ExchangeNode-37: [SourceAddress:192.0.3.1/test.2.0/40]
* ├──ExchangeNode-38: [SourceAddress:192.0.2.1/test.3.0/41]
* └──ExchangeNode-39: [SourceAddress:192.0.4.1/test.4.0/42]
*
* IdentitySinkNode-38
* └──DeviceView-29
* └──FullOuterTimeJoinNode-28
* ├──SeriesScanNode-26:[SeriesPath: root.sg.d1.s1, DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
* └──SeriesScanNode-27:[SeriesPath: root.sg.d1.s2, DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
* IdentitySinkNode-40
* └──SortNode-34
* └──DeviceView-23
* └──FullOuterTimeJoinNode-22
* ├──SeriesScanNode-20:[SeriesPath: root.sg.d22.s1, DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
* └──SeriesScanNode-21:[SeriesPath: root.sg.d22.s2, DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
*
* IdentitySinkNode-39
* └──DeviceView-33
* └──FullOuterTimeJoinNode-32
* ├──SeriesScanNode-30:[SeriesPath: root.sg.d333.s1, DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
* └──SeriesScanNode-31:[SeriesPath: root.sg.d333.s2, DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
* IdentitySinkNode-41
* └──SortNode-35
* └──DeviceView-27
* └──FullOuterTimeJoinNode-26
* ├──SeriesScanNode-24:[SeriesPath: root.sg.d1.s1, DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
* └──SeriesScanNode-25:[SeriesPath: root.sg.d1.s2, DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
*
* IdentitySinkNode-42
* └──SortNode-36
* └──DeviceView-31
* └──FullOuterTimeJoinNode-30
* ├──SeriesScanNode-28:[SeriesPath: root.sg.d333.s1, DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
* └──SeriesScanNode-29:[SeriesPath: root.sg.d333.s2, DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
*/
@Test
public void orderByExpressionTest1() {
Expand All @@ -758,14 +761,29 @@ public void orderByExpressionTest1() {
firstFiRoot = plan.getInstances().get(0).getFragment().getPlanNodeTree();
firstFiTopNode = firstFiRoot.getChildren().get(0);
assertTrue(firstFiTopNode instanceof TransformNode);
assertTrue(firstFiTopNode.getChildren().get(0) instanceof SortNode);
assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(0) instanceof MergeSortNode);
MergeSortNode mergeSortNode =
(MergeSortNode) firstFiTopNode.getChildren().get(0).getChildren().get(0);
assertTrue(mergeSortNode.getChildren().get(0) instanceof DeviceViewNode);
assertTrue(mergeSortNode.getChildren().get(1) instanceof ExchangeNode);
assertTrue(mergeSortNode.getChildren().get(2) instanceof ExchangeNode);
assertTrue(mergeSortNode.getChildren().get(3) instanceof ExchangeNode);
assertTrue(firstFiTopNode.getChildren().get(0) instanceof MergeSortNode);
assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(0) instanceof SortNode);
assertTrue(
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
instanceof DeviceViewNode);
assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(1) instanceof ExchangeNode);
assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(2) instanceof ExchangeNode);
assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(3) instanceof ExchangeNode);
for (int i = 1; i < 4; i++) {
assertTrue(
plan.getInstances().get(i).getFragment().getPlanNodeTree().getChildren().get(0)
instanceof SortNode);
assertTrue(
plan.getInstances()
.get(i)
.getFragment()
.getPlanNodeTree()
.getChildren()
.get(0)
.getChildren()
.get(0)
instanceof DeviceViewNode);
}
for (int i = 0; i < 4; i++) {
assertScanNodeLimitValue(plan.getInstances().get(i).getFragment().getPlanNodeTree(), 0);
}
Expand Down Expand Up @@ -834,35 +852,34 @@ public void orderByExpressionTest2() {
}

/*
* IdentitySinkNode-37
* └──TransformNode-13
* └──SortNode-12
* └──MergeSort-14
* └──DeviceView-15
* ├──AggregationNode-20
* │ ├──SeriesAggregationScanNode-16:[SeriesPath: root.sg.d1.s2, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* │ ├──SeriesAggregationScanNode-18:[SeriesPath: root.sg.d1.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* │ └──ExchangeNode-31: [SourceAddress:192.0.2.1/test.2.0/34]
* ├──ExchangeNode-33: [SourceAddress:192.0.3.1/test.3.0/35]
* └──AggregationNode-29
* ├──SeriesAggregationScanNode-25:[SeriesPath: root.sg.d333.s2, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* ├──SeriesAggregationScanNode-27:[SeriesPath: root.sg.d333.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* └──ExchangeNode-32: [SourceAddress:192.0.4.1/test.4.0/36]
* IdentitySinkNode-35
* └──TransformNode-12
* └──SortNode-11
* └──DeviceView-13
* ├──AggregationNode-18
* │ ├──SeriesAggregationScanNode-14:[SeriesPath: root.sg.d1.s2, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* │ ├──SeriesAggregationScanNode-16:[SeriesPath: root.sg.d1.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* │ └──ExchangeNode-29: [SourceAddress:192.0.2.1/test.2.0/32]
* ├──ExchangeNode-31: [SourceAddress:192.0.3.1/test.3.0/33]
* └──AggregationNode-27
* ├──SeriesAggregationScanNode-23:[SeriesPath: root.sg.d333.s2, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* ├──SeriesAggregationScanNode-25:[SeriesPath: root.sg.d333.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:1)]
* └──ExchangeNode-30: [SourceAddress:192.0.4.1/test.4.0/34]
*
* IdentitySinkNode-34
* └──HorizontallyConcatNode-21
* ├──SeriesAggregationScanNode-17:[SeriesPath: root.sg.d1.s2, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
* └──SeriesAggregationScanNode-19:[SeriesPath: root.sg.d1.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
* IdentitySinkNode-32
* └──HorizontallyConcatNode-19
* ├──SeriesAggregationScanNode-15:[SeriesPath: root.sg.d1.s2, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
* └──SeriesAggregationScanNode-17:[SeriesPath: root.sg.d1.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:2)]
*
* IdentitySinkNode-35
* └──HorizontallyConcatNode-24
* ├──SeriesAggregationScanNode-22:[SeriesPath: root.sg.d22.s1, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
* └──SeriesAggregationScanNode-23:[SeriesPath: root.sg.d22.s2, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
*
* IdentitySinkNode-36
* └──HorizontallyConcatNode-30
* ├──SeriesAggregationScanNode-26:[SeriesPath: root.sg.d333.s2, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
* └──SeriesAggregationScanNode-28:[SeriesPath: root.sg.d333.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
* IdentitySinkNode-33
* └──HorizontallyConcatNode-22
* ├──SeriesAggregationScanNode-20:[SeriesPath: root.sg.d22.s1, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
* └──SeriesAggregationScanNode-21:[SeriesPath: root.sg.d22.s2, Descriptor: [AggregationDescriptor(count, SINGLE)], DataRegion: TConsensusGroupId(type:DataRegion, id:3)]
*
* IdentitySinkNode-34
* └──HorizontallyConcatNode-28
* ├──SeriesAggregationScanNode-24:[SeriesPath: root.sg.d333.s2, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
* └──SeriesAggregationScanNode-26:[SeriesPath: root.sg.d333.s1, Descriptor: [AggregationDescriptor(count, PARTIAL)], DataRegion: TConsensusGroupId(type:DataRegion, id:4)]
*/
@Test
public void orderByExpressionTest3() {
Expand All @@ -879,13 +896,16 @@ public void orderByExpressionTest3() {
firstFiTopNode = firstFiRoot.getChildren().get(0);
assertTrue(firstFiTopNode instanceof TransformNode);
assertTrue(firstFiTopNode.getChildren().get(0) instanceof SortNode);
assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(0) instanceof MergeSortNode);
MergeSortNode mergeSortNode =
(MergeSortNode) firstFiTopNode.getChildren().get(0).getChildren().get(0);
assertTrue(mergeSortNode.getChildren().get(0) instanceof DeviceViewNode);
assertTrue(mergeSortNode.getChildren().get(0).getChildren().get(0) instanceof AggregationNode);
assertTrue(mergeSortNode.getChildren().get(0).getChildren().get(1) instanceof ExchangeNode);
assertTrue(mergeSortNode.getChildren().get(0).getChildren().get(2) instanceof AggregationNode);
assertTrue(firstFiTopNode.getChildren().get(0).getChildren().get(0) instanceof DeviceViewNode);
assertTrue(
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(0)
instanceof AggregationNode);
assertTrue(
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(1)
instanceof ExchangeNode);
assertTrue(
firstFiTopNode.getChildren().get(0).getChildren().get(0).getChildren().get(2)
instanceof AggregationNode);
for (int i = 1; i < 4; i++) {
assertTrue(
plan.getInstances().get(i).getFragment().getPlanNodeTree().getChildren().get(0)
Expand Down

0 comments on commit e452b07

Please sign in to comment.