Skip to content

Commit

Permalink
[IOTDB-6328] Add optimization for aggregation query in align by devic…
Browse files Browse the repository at this point in the history
…e with template situation
  • Loading branch information
Beyyes authored May 28, 2024
1 parent b860e00 commit 1af2c23
Show file tree
Hide file tree
Showing 28 changed files with 1,935 additions and 241 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ aggregation results last_value(temperature) and last_value(status), whereas buck
private Template deviceTemplate;
// when deviceTemplate is not empty and all expressions in this query are templated measurements,
// i.e. no aggregation and arithmetic expression
private boolean onlyQueryTemplateMeasurements = true;
private boolean noWhereAndAggregation = true;
// if it is wildcard query in templated align by device query
private boolean templateWildCardQuery;
// all queried measurementList and schemaList in deviceTemplate.
Expand Down Expand Up @@ -437,8 +437,8 @@ public TSDataType getType(Expression expression) {
return null;
}

if (isAllDevicesInOneTemplate()
&& (isOnlyQueryTemplateMeasurements() || expression instanceof TimeSeriesOperand)) {
if (allDevicesInOneTemplate()
&& (noWhereAndAggregation() || expression instanceof TimeSeriesOperand)) {
TimeSeriesOperand seriesOperand = (TimeSeriesOperand) expression;
return deviceTemplate.getSchemaMap().get(seriesOperand.getPath().getMeasurement()).getType();
}
Expand Down Expand Up @@ -921,7 +921,7 @@ public List<PartialPath> getDeviceList() {
// All Queries Devices Set In One Template
/////////////////////////////////////////////////////////////////////////////////////////////////

public boolean isAllDevicesInOneTemplate() {
public boolean allDevicesInOneTemplate() {
return this.deviceTemplate != null;
}

Expand All @@ -933,12 +933,12 @@ public void setDeviceTemplate(Template template) {
this.deviceTemplate = template;
}

public boolean isOnlyQueryTemplateMeasurements() {
return onlyQueryTemplateMeasurements;
public boolean noWhereAndAggregation() {
return noWhereAndAggregation;
}

public void setOnlyQueryTemplateMeasurements(boolean onlyQueryTemplateMeasurements) {
this.onlyQueryTemplateMeasurements = onlyQueryTemplateMeasurements;
public void setNoWhereAndAggregation(boolean value) {
this.noWhereAndAggregation = value;
}

public List<String> getMeasurementList() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext>
static final Expression DEVICE_EXPRESSION =
TimeSeriesOperand.constructColumnHeaderExpression(DEVICE, TSDataType.TEXT);

static final Expression END_TIME_EXPRESSION =
public static final Expression END_TIME_EXPRESSION =
TimeSeriesOperand.constructColumnHeaderExpression(ENDTIME, TSDataType.INT64);

private final List<String> lastQueryColumnNames =
Expand Down Expand Up @@ -1904,20 +1904,22 @@ private void checkGroupByConditionExpressionType(
&& rightExpression instanceof ConstantOperand)) {
throw new SemanticException(
String.format(
"Please check the keep condition ([%s]),it need to be a constant or a compare expression constructed by 'keep' and a long number.",
"Please check the keep condition ([%s]), "
+ "it need to be a constant or a compare expression constructed by 'keep' and a long number.",
keepExpression.getExpressionString()));
}
return;
}
if (!(keepExpression instanceof ConstantOperand)) {
throw new SemanticException(
String.format(
"Please check the keep condition ([%s]),it need to be a constant or a compare expression constructed by 'keep' and a long number.",
"Please check the keep condition ([%s]), "
+ "it need to be a constant or a compare expression constructed by 'keep' and a long number.",
keepExpression.getExpressionString()));
}
}

private void analyzeGroupByTime(Analysis analysis, QueryStatement queryStatement) {
static void analyzeGroupByTime(Analysis analysis, QueryStatement queryStatement) {
if (!queryStatement.isGroupByTime()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ private ExpressionTypeAnalyzer() {}
public static TSDataType analyzeExpression(Analysis analysis, Expression expression) {
if (!analysis.getExpressionTypes().containsKey(NodeRef.of(expression))) {
ExpressionTypeAnalyzer analyzer = new ExpressionTypeAnalyzer();
analyzer.analyze(expression, null);

Map<String, IMeasurementSchema> context =
analysis.allDevicesInOneTemplate() ? analysis.getDeviceTemplate().getSchemaMap() : null;
analyzer.analyze(expression, context);

addExpressionTypes(analysis, analyzer);
}
Expand Down Expand Up @@ -96,7 +99,9 @@ public static void analyzeExpressionUsingTemplatedInfo(
Expression expression,
TemplatedInfo templatedInfo) {
ExpressionTypeAnalyzer analyzer = new ExpressionTypeAnalyzer();
analyzer.analyze(expression, templatedInfo.getSchemaMap());

Map<String, IMeasurementSchema> schemaMap = templatedInfo.getSchemaMap();
analyzer.analyze(expression, schemaMap);

types.putAll(analyzer.getExpressionTypes());
}
Expand Down Expand Up @@ -369,6 +374,7 @@ public TSDataType visitTimeSeriesOperand(
return setExpressionType(
timeSeriesOperand, context.get(timeSeriesOperand.getOutputSymbol()).getType());
}

return setExpressionType(timeSeriesOperand, timeSeriesOperand.getPath().getSeriesType());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.queryengine.plan.analyze;

import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.NodeRef;
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.ConstantOperand;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimestampOperand;
import org.apache.iotdb.db.queryengine.plan.expression.multi.FunctionExpression;
import org.apache.iotdb.db.queryengine.plan.statement.component.ResultColumn;
import org.apache.iotdb.db.queryengine.plan.statement.crud.QueryStatement;
import org.apache.iotdb.db.schemaengine.template.Template;

import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.schema.IMeasurementSchema;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;

import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.ENDTIME;
import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.DEVICE_EXPRESSION;
import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeExpressionType;
import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeGroupByTime;
import static org.apache.iotdb.db.queryengine.plan.analyze.AnalyzeVisitor.analyzeOutput;
import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.normalizeExpression;
import static org.apache.iotdb.db.queryengine.plan.analyze.ExpressionAnalyzer.searchAggregationExpressions;
import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDataPartition;
import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDeviceToWhere;
import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeDeviceViewOutput;
import static org.apache.iotdb.db.queryengine.plan.analyze.TemplatedAnalyze.analyzeFrom;
import static org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.canPushDownLimitOffsetInGroupByTimeForDevice;
import static org.apache.iotdb.db.queryengine.plan.optimization.LimitOffsetPushDown.pushDownLimitOffsetInGroupByTimeForDevice;

/** Methods in this class are used for aggregation, templated with align by device situation. */
public class TemplatedAggregationAnalyze {

static boolean canBuildAggregationPlanUseTemplate(
Analysis analysis,
QueryStatement queryStatement,
IPartitionFetcher partitionFetcher,
ISchemaTree schemaTree,
MPPQueryContext context,
Template template) {

// not support order by expression and non-aligned template
if (queryStatement.hasOrderByExpression() || !template.isDirectAligned()) {
return false;
}

analysis.setNoWhereAndAggregation(false);

List<PartialPath> deviceList = analyzeFrom(queryStatement, schemaTree);

if (canPushDownLimitOffsetInGroupByTimeForDevice(queryStatement)) {
// remove the device which won't appear in resultSet after limit/offset
deviceList = pushDownLimitOffsetInGroupByTimeForDevice(deviceList, queryStatement);
}

List<Pair<Expression, String>> outputExpressions = new ArrayList<>();
boolean valid = analyzeSelect(queryStatement, analysis, outputExpressions, template);
if (!valid) {
analysis.setDeviceTemplate(null);
return false;
}

analyzeDeviceToWhere(analysis, queryStatement);
if (deviceList.isEmpty()) {
analysis.setFinishQueryAfterAnalyze(true);
return true;
}
analysis.setDeviceList(deviceList);

if (analysis.getWhereExpression() != null
&& ConstantOperand.FALSE.equals(analysis.getWhereExpression())) {
analyzeOutput(analysis, queryStatement, outputExpressions);
analysis.setFinishQueryAfterAnalyze(true);
return true;
}

valid = analyzeHaving(analysis, queryStatement);
if (!valid) {
analysis.setDeviceTemplate(null);
return false;
}

analyzeDeviceToExpressions(analysis);

analyzeDeviceViewOutput(analysis, queryStatement);

// generate result set header according to output expressions
analyzeOutput(analysis, queryStatement, outputExpressions);

analyzeGroupByTime(analysis, queryStatement);
context.generateGlobalTimeFilter(analysis);

// fetch partition information
analyzeDataPartition(analysis, schemaTree, partitionFetcher, context.getGlobalTimeFilter());
return true;
}

private static boolean analyzeSelect(
QueryStatement queryStatement,
Analysis analysis,
List<Pair<Expression, String>> outputExpressions,
Template template) {
LinkedHashSet<Expression> selectExpressions = new LinkedHashSet<>();
selectExpressions.add(DEVICE_EXPRESSION);
if (queryStatement.isOutputEndTime()) {
return false;
}

analysis.setDeviceTemplate(template);

ColumnPaginationController paginationController =
new ColumnPaginationController(
queryStatement.getSeriesLimit(), queryStatement.getSeriesOffset());

Set<Expression> aggregationExpressions = new LinkedHashSet<>();
for (ResultColumn resultColumn : queryStatement.getSelectComponent().getResultColumns()) {
if (paginationController.hasCurOffset()) {
paginationController.consumeOffset();
} else if (paginationController.hasCurLimit()) {
Expression selectExpression = resultColumn.getExpression();
outputExpressions.add(new Pair<>(selectExpression, resultColumn.getAlias()));
selectExpressions.add(selectExpression);
aggregationExpressions.add(selectExpression);
if (selectExpression instanceof FunctionExpression
&& "count_time"
.equalsIgnoreCase(((FunctionExpression) selectExpression).getFunctionName())) {
analysis.getExpressionTypes().put(NodeRef.of(selectExpression), TSDataType.INT64);
((FunctionExpression) selectExpression)
.setExpressions(Collections.singletonList(new TimestampOperand()));
} else {
analyzeExpressionType(analysis, selectExpression);
}
} else {
break;
}
}

List<String> measurementList = new ArrayList<>();
List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
Set<String> measurementSet = new HashSet<>();

if (queryStatement.isCountTimeAggregation()) {
measurementList = new ArrayList<>(template.getSchemaMap().keySet());
measurementSchemaList = new ArrayList<>(template.getSchemaMap().values());
} else {
int idx = 0;
for (Expression selectExpression : selectExpressions) {
idx++;
if (idx == 1
|| (idx == 2 && ENDTIME.equalsIgnoreCase(selectExpression.getOutputSymbol()))) {
continue;
}

String measurement = selectExpression.getExpressions().get(0).getOutputSymbol();
// not support agg(*), agg(s1+1) now
if (!template.getSchemaMap().containsKey(measurement)) {
return false;
}

// for agg1(s1) + agg2(s1), only record s1 for one time
if (!measurementSet.contains(measurement)) {
measurementSet.add(measurement);
measurementList.add(measurement);
measurementSchemaList.add(template.getSchemaMap().get(measurement));
}
}
}

analysis.setMeasurementList(measurementList);
analysis.setMeasurementSchemaList(measurementSchemaList);
analysis.setAggregationExpressions(aggregationExpressions);
analysis.setOutputExpressions(outputExpressions);
analysis.setSelectExpressions(selectExpressions);
return true;
}

private static boolean analyzeHaving(Analysis analysis, QueryStatement queryStatement) {
if (!queryStatement.hasHaving()) {
return true;
}

Set<String> measurementSet = new HashSet<>(analysis.getMeasurementList());
Set<Expression> aggregationExpressions = analysis.getAggregationExpressions();
Expression havingExpression = queryStatement.getHavingCondition().getPredicate();
for (Expression aggregationExpression : searchAggregationExpressions(havingExpression)) {
Expression normalizedAggregationExpression = normalizeExpression(aggregationExpression);

// not support having agg(s1+s2) temporarily
if (!((normalizedAggregationExpression).getExpressions().get(0)
instanceof TimeSeriesOperand)) {
return false;
}

String measurement =
normalizedAggregationExpression.getExpressions().get(0).getOutputSymbol();
if (!measurementSet.contains(measurement)) {
// adapt this case: select agg(s1) from xx having agg(s3)
measurementSet.add(measurement);
analysis.getMeasurementList().add(measurement);
analysis
.getMeasurementSchemaList()
.add(analysis.getDeviceTemplate().getSchema(measurement));
}

analyzeExpressionType(analysis, aggregationExpression);
analyzeExpressionType(analysis, normalizedAggregationExpression);

aggregationExpressions.add(aggregationExpression);
}

TSDataType outputType = analyzeExpressionType(analysis, havingExpression);
if (outputType != TSDataType.BOOLEAN) {
throw new SemanticException(
String.format(
"The output type of the expression in HAVING clause should be BOOLEAN, actual data type: %s.",
outputType));
}
analysis.setHavingExpression(havingExpression);

return true;
}

private static void analyzeDeviceToExpressions(Analysis analysis) {
analysis.setDeviceToSourceTransformExpressions(analysis.getDeviceToSelectExpressions());

analysis.setDeviceToSourceExpressions(analysis.getDeviceToSelectExpressions());
analysis.setDeviceToOutputExpressions(analysis.getDeviceToSelectExpressions());

analysis.setDeviceToAggregationExpressions(analysis.getDeviceToSelectExpressions());
}
}
Loading

0 comments on commit 1af2c23

Please sign in to comment.