diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index f6317d731c77b..1ca24a1fddfc5 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -5036,6 +5036,11 @@ "Invalid partitioning: is missing or is in a map or array." ] }, + "PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE" : { + "message" : [ + "The SQL pipe operator syntax with aggregation (using |> AGGREGATE) does not support ." + ] + }, "PIVOT_AFTER_GROUP_BY" : { "message" : [ "PIVOT clause following a GROUP BY clause. Consider pushing the GROUP BY into a subquery." diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index b4446b1538cd6..3edc5f97c39e7 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -400,6 +400,7 @@ Below is a list of all the keywords in Spark SQL. |--|----------------------|-------------------------|--------| |ADD|non-reserved|non-reserved|non-reserved| |AFTER|non-reserved|non-reserved|non-reserved| +|AGGREGATE|non-reserved|non-reserved|non-reserved| |ALL|reserved|non-reserved|reserved| |ALTER|non-reserved|non-reserved|reserved| |ALWAYS|non-reserved|non-reserved|non-reserved| diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index 7391e8c353dee..085e723d02bc0 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -120,6 +120,7 @@ BANG: '!'; //--SPARK-KEYWORD-LIST-START ADD: 'ADD'; AFTER: 'AFTER'; +AGGREGATE: 'AGGREGATE'; ALL: 'ALL'; ALTER: 'ALTER'; ALWAYS: 'ALWAYS'; diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 9d237f069132a..cd30c6ef757f5 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -765,7 +765,7 @@ temporalClause aggregationClause : GROUP BY groupingExpressionsWithGroupingAnalytics+=groupByClause (COMMA groupingExpressionsWithGroupingAnalytics+=groupByClause)* - | GROUP BY groupingExpressions+=expression (COMMA groupingExpressions+=expression)* ( + | GROUP BY groupingExpressions+=namedExpression (COMMA groupingExpressions+=namedExpression)* ( WITH kind=ROLLUP | WITH kind=CUBE | kind=GROUPING SETS LEFT_PAREN groupingSet (COMMA groupingSet)* RIGHT_PAREN)? @@ -1512,6 +1512,7 @@ operatorPipeRightSide | sample | joinRelation | operator=(UNION | EXCEPT | SETMINUS | INTERSECT) setQuantifier? right=queryTerm + | AGGREGATE namedExpressionSeq? aggregationClause? ; // When `SQL_standard_keyword_behavior=true`, there are 2 kinds of keywords in Spark SQL. @@ -1528,6 +1529,7 @@ ansiNonReserved //--ANSI-NON-RESERVED-START : ADD | AFTER + | AGGREGATE | ALTER | ALWAYS | ANALYZE @@ -1850,6 +1852,7 @@ nonReserved //--DEFAULT-NON-RESERVED-START : ADD | AFTER + | AGGREGATE | ALL | ALTER | ALWAYS diff --git a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala index b0743d6de4772..0dd366f649c2b 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala @@ -86,6 +86,15 @@ private[sql] object QueryParsingErrors extends DataTypeErrorsBase { new ParseException(errorClass = "UNSUPPORTED_FEATURE.COMBINATION_QUERY_RESULT_CLAUSES", ctx) } + def pipeOperatorAggregateUnsupportedCaseError( + caseArgument: String, + ctx: ParserRuleContext): Throwable = { + new ParseException( + errorClass = "UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE", + messageParameters = Map("case" -> caseArgument), + ctx) + } + def distributeByUnsupportedError(ctx: QueryOrganizationContext): Throwable = { new ParseException(errorClass = "_LEGACY_ERROR_TEMP_0012", ctx) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index c9150b8a26100..0c427316217c1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -1481,7 +1481,7 @@ class AstBuilder extends DataTypeAstBuilder selectExpressions: Seq[NamedExpression], query: LogicalPlan): LogicalPlan = withOrigin(ctx) { if (ctx.groupingExpressionsWithGroupingAnalytics.isEmpty) { - val groupByExpressions = expressionList(ctx.groupingExpressions) + val groupByExpressions = namedExpressionList(ctx.groupingExpressions) if (ctx.GROUPING != null) { // GROUP BY ... GROUPING SETS (...) // `groupByExpressions` can be non-empty for Hive compatibility. It may add extra grouping @@ -2172,6 +2172,14 @@ class AstBuilder extends DataTypeAstBuilder trees.asScala.map(expression).toSeq } + /** + * Create sequence of named expressions from the given sequence of contexts. + */ + private def namedExpressionList( + trees: java.util.List[NamedExpressionContext]): Seq[Expression] = { + trees.asScala.map(visitNamedExpression).toSeq + } + /** * Create a star (i.e. all) expression; this selects all elements (in the specified object). * Both un-targeted (global) and targeted aliases are supported. @@ -5924,7 +5932,79 @@ class AstBuilder extends DataTypeAstBuilder }.getOrElse(Option(ctx.operator).map { c => val all = Option(ctx.setQuantifier()).exists(_.ALL != null) visitSetOperationImpl(left, plan(ctx.right), all, c.getType) - }.get)))))) + }.getOrElse( + visitOperatorPipeAggregate(ctx, left) + ))))))) + } + + private def visitOperatorPipeAggregate( + ctx: OperatorPipeRightSideContext, left: LogicalPlan): LogicalPlan = { + assert(ctx.AGGREGATE != null) + if (ctx.namedExpressionSeq() == null && ctx.aggregationClause() == null) { + operationNotAllowed( + "The AGGREGATE clause requires a list of aggregate expressions " + + "or a list of grouping expressions, or both", ctx) + } + val aggregateExpressions: Seq[NamedExpression] = + Option(ctx.namedExpressionSeq()).map { n: NamedExpressionSeqContext => + visitNamedExpressionSeq(n).map { + case (e: NamedExpression, _) => e + case (e: Expression, aliasFunc) => UnresolvedAlias(e, aliasFunc) + } + }.getOrElse(Seq.empty) + Option(ctx.aggregationClause()).map { c: AggregationClauseContext => + withAggregationClause(c, aggregateExpressions, left) match { + case a: Aggregate => + // GROUP BY ALL, GROUP BY CUBE, GROUPING_ID, GROUPING SETS, and GROUP BY ROLLUP are not + // supported yet. + def error(s: String): Unit = + throw QueryParsingErrors.pipeOperatorAggregateUnsupportedCaseError(s, c) + a.groupingExpressions match { + case Seq(key: UnresolvedAttribute) if key.equalsIgnoreCase("ALL") => + error("GROUP BY ALL") + case _ => + } + def visit(e: Expression): Unit = { + e match { + case _: Cube => error("GROUP BY CUBE") + case _: GroupingSets => error("GROUPING SETS") + case _: Rollup => error("GROUP BY ROLLUP") + case f: UnresolvedFunction if f.arguments.length == 1 && f.nameParts.length == 1 => + Seq("GROUPING", "GROUPING_ID").foreach { name => + if (f.nameParts.head.equalsIgnoreCase(name)) error(name) + } + case _: WindowSpec => error("window functions") + case _ => + } + e.children.foreach(visit) + } + a.groupingExpressions.foreach(visit) + a.aggregateExpressions.foreach(visit) + // Non-aggregate expressions are not allowed in place of aggregate functions, even if they + // appear separately in the GROUP BY clause. + val groupingExpressionSet = a.groupingExpressions.toSet + a.aggregateExpressions.foreach { e: Expression => + if (groupingExpressionSet.contains(e)) { + error("Non-aggregate expressions in place of aggregate functions, even if they " + + "appear separately in the GROUP BY clause") + } + } + // Prepend grouping keys to the list of aggregate functions, since operator pipe AGGREGATE + // clause returns the GROUP BY expressions followed by the list of aggregate functions. + val namedGroupingExpressions: Seq[NamedExpression] = + a.groupingExpressions.map { + case n: NamedExpression => n + case e: Expression => UnresolvedAlias(e, None) + } + a.copy(aggregateExpressions = namedGroupingExpressions ++ a.aggregateExpressions) + } + }.getOrElse { + // This is a table aggregation with no grouping expressions. + Aggregate( + groupingExpressions = Seq.empty, + aggregateExpressions = aggregateExpressions, + child = left) + } } /** diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out index 7fa4ec0514ff0..28c44ee3f3a43 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out @@ -2022,6 +2022,424 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException } +-- !query +select 1 as x, 2 as y +|> aggregate group by x, y +-- !query analysis +Aggregate [x#x, y#x], [x#x, y#x] ++- Project [1 AS x#x, 2 AS y#x] + +- OneRowRelation + + +-- !query +select 3 as x, 4 as y +|> aggregate group by 1, 2 +-- !query analysis +Aggregate [1, 2], [1 AS 1#x, 2 AS 2#x] ++- Project [3 AS x#x, 4 AS y#x] + +- OneRowRelation + + +-- !query +table other +|> aggregate sum(b) as result group by a +-- !query analysis +Aggregate [a#x], [a#x, sum(b#x) AS result#xL] ++- SubqueryAlias spark_catalog.default.other + +- Relation spark_catalog.default.other[a#x,b#x] json + + +-- !query +table t +|> aggregate sum(x) +-- !query analysis +Aggregate [sum(x#x) AS sum(x)#xL] ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> aggregate sum(x) + 1 as result_plus_one +-- !query analysis +Aggregate [(sum(x#x) + cast(1 as bigint)) AS result_plus_one#xL] ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table other +|> aggregate group by a +|> where a = 1 +-- !query analysis +Filter (a#x = 1) ++- SubqueryAlias __auto_generated_subquery_name + +- Aggregate [a#x], [a#x] + +- SubqueryAlias spark_catalog.default.other + +- Relation spark_catalog.default.other[a#x,b#x] json + + +-- !query +select 1 as x, 2 as y, 3 as z +|> aggregate group by x, y, x + y as z +-- !query analysis +Aggregate [x#x, y#x, (x#x + y#x)], [x#x, y#x, (x#x + y#x) AS z#x] ++- Project [1 AS x#x, 2 AS y#x, 3 AS z#x] + +- OneRowRelation + + +-- !query +select 1 as x, 2 as y, 3 as z +|> aggregate group by x as z, x + y as z +-- !query analysis +Aggregate [x#x, (x#x + y#x)], [x#x AS z#x, (x#x + y#x) AS z#x] ++- Project [1 AS x#x, 2 AS y#x, 3 AS z#x] + +- OneRowRelation + + +-- !query +select 1 as x, 2 as y, named_struct('z', 3) as st +|> aggregate group by x, y, x, x, st.z, (st).z, 1 + x, 2 + x +-- !query analysis +Aggregate [x#x, y#x, x#x, x#x, st#x.z, st#x.z, (1 + x#x), (2 + x#x)], [x#x, y#x, x#x, x#x, st#x.z AS z#x, st#x.z AS z#x, (1 + x#x) AS (1 + x)#x, (2 + x#x) AS (2 + x)#x] ++- Project [1 AS x#x, 2 AS y#x, named_struct(z, 3) AS st#x] + +- OneRowRelation + + +-- !query +select 1 x, 2 y, 3 z +|> aggregate sum(z) z group by x, y +|> aggregate avg(z) z group by x +|> aggregate count(distinct z) c +-- !query analysis +Aggregate [count(distinct z#x) AS c#xL] ++- Aggregate [x#x], [x#x, avg(z#xL) AS z#x] + +- Aggregate [x#x, y#x], [x#x, y#x, sum(z#x) AS z#xL] + +- Project [1 AS x#x, 2 AS y#x, 3 AS z#x] + +- OneRowRelation + + +-- !query +select 1 x, 3 z +|> aggregate count(*) group by x, z, x +|> select x +-- !query analysis +Project [x#x] ++- Aggregate [x#x, z#x, x#x], [x#x, z#x, x#x, count(1) AS count(1)#xL] + +- Project [1 AS x#x, 3 AS z#x] + +- OneRowRelation + + +-- !query +select 3 as x, 4 as y +|> aggregate group by all +-- !query analysis +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE", + "sqlState" : "0A000", + "messageParameters" : { + "case" : "GROUP BY ALL" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 47, + "fragment" : "select 3 as x, 4 as y\n|> aggregate group by all" + } ] +} + + +-- !query +table courseSales +|> aggregate sum(earnings) group by rollup(course, `year`) +|> where course = 'dotNET' and `year` = '2013' +-- !query analysis +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE", + "sqlState" : "0A000", + "messageParameters" : { + "case" : "GROUP BY ROLLUP" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 123, + "fragment" : "table courseSales\n|> aggregate sum(earnings) group by rollup(course, `year`)\n|> where course = 'dotNET' and `year` = '2013'" + } ] +} + + +-- !query +table courseSales +|> aggregate sum(earnings) group by cube(course, `year`) +|> where course = 'dotNET' and `year` = '2013' +-- !query analysis +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE", + "sqlState" : "0A000", + "messageParameters" : { + "case" : "GROUP BY CUBE" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 121, + "fragment" : "table courseSales\n|> aggregate sum(earnings) group by cube(course, `year`)\n|> where course = 'dotNET' and `year` = '2013'" + } ] +} + + +-- !query +table courseSales +|> aggregate sum(earnings) group by course, `year` grouping sets(course, `year`) +|> where course = 'dotNET' and `year` = '2013' +-- !query analysis +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE", + "sqlState" : "0A000", + "messageParameters" : { + "case" : "GROUPING SETS" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 145, + "fragment" : "table courseSales\n|> aggregate sum(earnings) group by course, `year` grouping sets(course, `year`)\n|> where course = 'dotNET' and `year` = '2013'" + } ] +} + + +-- !query +table courseSales +|> aggregate sum(earnings), grouping(course) + 1 + group by course +|> where course = 'dotNET' and `year` = '2013' +-- !query analysis +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE", + "sqlState" : "0A000", + "messageParameters" : { + "case" : "GROUPING" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 132, + "fragment" : "table courseSales\n|> aggregate sum(earnings), grouping(course) + 1\n group by course\n|> where course = 'dotNET' and `year` = '2013'" + } ] +} + + +-- !query +table courseSales +|> aggregate sum(earnings), grouping_id(course) + group by course +|> where course = 'dotNET' and `year` = '2013' +-- !query analysis +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE", + "sqlState" : "0A000", + "messageParameters" : { + "case" : "GROUPING_ID" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 131, + "fragment" : "table courseSales\n|> aggregate sum(earnings), grouping_id(course)\n group by course\n|> where course = 'dotNET' and `year` = '2013'" + } ] +} + + +-- !query +select 1 as x, 2 as y +|> aggregate group by () +-- !query analysis +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { + "error" : "')'", + "hint" : "" + } +} + + +-- !query +table other +|> aggregate a +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "MISSING_GROUP_BY", + "sqlState" : "42803", + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 26, + "fragment" : "table other\n|> aggregate a" + } ] +} + + +-- !query +table other +|> aggregate a group by a +-- !query analysis +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE", + "sqlState" : "0A000", + "messageParameters" : { + "case" : "Non-aggregate expressions in place of aggregate functions, even if they appear separately in the GROUP BY clause" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 37, + "fragment" : "table other\n|> aggregate a group by a" + } ] +} + + +-- !query +table other +|> select sum(a) as result +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION", + "sqlState" : "0A000", + "messageParameters" : { + "expr" : "sum(a#x)" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 23, + "stopIndex" : 28, + "fragment" : "sum(a)" + } ] +} + + +-- !query +table other +|> aggregate +-- !query analysis +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_0035", + "messageParameters" : { + "message" : "The AGGREGATE clause requires a list of aggregate expressions or a list of grouping expressions, or both" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 24, + "fragment" : "table other\n|> aggregate" + } ] +} + + +-- !query +table other +|> aggregate group by +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "UNRESOLVED_COLUMN.WITH_SUGGESTION", + "sqlState" : "42703", + "messageParameters" : { + "objectName" : "`group`", + "proposal" : "`a`, `b`" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 26, + "stopIndex" : 30, + "fragment" : "group" + } ] +} + + +-- !query +table other +|> group by a +-- !query analysis +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { + "error" : "'group'", + "hint" : "" + } +} + + +-- !query +table other +|> aggregate sum(a) over () group by b +-- !query analysis +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE", + "sqlState" : "0A000", + "messageParameters" : { + "case" : "window functions" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 50, + "fragment" : "table other\n|> aggregate sum(a) over () group by b" + } ] +} + + +-- !query +select 1 x, 2 y, 3 z +|> aggregate count(*) AS c, sum(x) AS x group by x +|> where c = 1 +|> where x = 1 +-- !query analysis +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "AMBIGUOUS_REFERENCE", + "sqlState" : "42704", + "messageParameters" : { + "name" : "`x`", + "referenceNames" : "[`x`, `x`]" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 97, + "stopIndex" : 97, + "fragment" : "x" + } ] +} + + -- !query drop table t -- !query analysis diff --git a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql index 61890f5cb146d..1ab489e0359dd 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql @@ -571,6 +571,130 @@ table t table t |> union all table st; +-- Aggregation operators: positive tests. +----------------------------------------- + +-- Basic aggregation on a constant table. +select 1 as x, 2 as y +|> aggregate group by x, y; + +-- Basic aggregation with group by ordinals. +select 3 as x, 4 as y +|> aggregate group by 1, 2; + +-- Basic aggregation with a GROUP BY clause. +table other +|> aggregate sum(b) as result group by a; + +-- Basic table aggregation. +table t +|> aggregate sum(x); + +-- Basic table aggregation with an alias. +table t +|> aggregate sum(x) + 1 as result_plus_one; + +-- Grouping with no aggregate functions. +table other +|> aggregate group by a +|> where a = 1; + +-- Group by an expression on columns, all of which are already grouped. +select 1 as x, 2 as y, 3 as z +|> aggregate group by x, y, x + y as z; + +-- Group by an expression on columns, some of which (y) aren't already grouped. +select 1 as x, 2 as y, 3 as z +|> aggregate group by x as z, x + y as z; + +-- We get an output column for each item in GROUP BY, even when they are duplicate expressions. +select 1 as x, 2 as y, named_struct('z', 3) as st +|> aggregate group by x, y, x, x, st.z, (st).z, 1 + x, 2 + x; + +-- Chained aggregates. +select 1 x, 2 y, 3 z +|> aggregate sum(z) z group by x, y +|> aggregate avg(z) z group by x +|> aggregate count(distinct z) c; + +-- Ambiguous name from duplicate GROUP BY item. This is generally allowed. +select 1 x, 3 z +|> aggregate count(*) group by x, z, x +|> select x; + +-- Aggregation operators: negative tests. +----------------------------------------- + +-- GROUP BY ALL is not currently supported. +select 3 as x, 4 as y +|> aggregate group by all; + +-- GROUP BY ROLLUP is not supported yet. +table courseSales +|> aggregate sum(earnings) group by rollup(course, `year`) +|> where course = 'dotNET' and `year` = '2013'; + +-- GROUP BY CUBE is not supported yet. +table courseSales +|> aggregate sum(earnings) group by cube(course, `year`) +|> where course = 'dotNET' and `year` = '2013'; + +-- GROUPING SETS is not supported yet. +table courseSales +|> aggregate sum(earnings) group by course, `year` grouping sets(course, `year`) +|> where course = 'dotNET' and `year` = '2013'; + +-- GROUPING/GROUPING_ID is not supported yet. +table courseSales +|> aggregate sum(earnings), grouping(course) + 1 + group by course +|> where course = 'dotNET' and `year` = '2013'; + +-- GROUPING/GROUPING_ID is not supported yet. +table courseSales +|> aggregate sum(earnings), grouping_id(course) + group by course +|> where course = 'dotNET' and `year` = '2013'; + +-- GROUP BY () is not valid syntax. +select 1 as x, 2 as y +|> aggregate group by (); + +-- Non-aggregate expressions are not allowed in place of aggregate functions. +table other +|> aggregate a; + +-- Non-aggregate expressions are not allowed in place of aggregate functions, even if they appear +-- separately in the GROUP BY clause. +table other +|> aggregate a group by a; + +-- Using aggregate functions without the AGGREGATE keyword is not allowed. +table other +|> select sum(a) as result; + +-- The AGGREGATE keyword requires a GROUP BY clause and/or aggregation function(s). +table other +|> aggregate; + +-- The AGGREGATE GROUP BY list cannot be empty. +table other +|> aggregate group by; + +-- The AGGREGATE keyword is required to perform grouping. +table other +|> group by a; + +-- Window functions are not allowed in the AGGREGATE expression list. +table other +|> aggregate sum(a) over () group by b; + +-- Ambiguous name from AGGREGATE list vs GROUP BY. +select 1 x, 2 y, 3 z +|> aggregate count(*) AS c, sum(x) AS x group by x +|> where c = 1 +|> where x = 1; + -- Cleanup. ----------- drop table t; diff --git a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out index 8cbc5357d78b6..034059d196774 100644 --- a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out @@ -1673,6 +1673,450 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException } +-- !query +select 1 as x, 2 as y +|> aggregate group by x, y +-- !query schema +struct +-- !query output +1 2 + + +-- !query +select 3 as x, 4 as y +|> aggregate group by 1, 2 +-- !query schema +struct<1:int,2:int> +-- !query output +1 2 + + +-- !query +table other +|> aggregate sum(b) as result group by a +-- !query schema +struct +-- !query output +1 3 +2 4 + + +-- !query +table t +|> aggregate sum(x) +-- !query schema +struct +-- !query output +1 + + +-- !query +table t +|> aggregate sum(x) + 1 as result_plus_one +-- !query schema +struct +-- !query output +2 + + +-- !query +table other +|> aggregate group by a +|> where a = 1 +-- !query schema +struct +-- !query output +1 + + +-- !query +select 1 as x, 2 as y, 3 as z +|> aggregate group by x, y, x + y as z +-- !query schema +struct +-- !query output +1 2 3 + + +-- !query +select 1 as x, 2 as y, 3 as z +|> aggregate group by x as z, x + y as z +-- !query schema +struct +-- !query output +1 3 + + +-- !query +select 1 as x, 2 as y, named_struct('z', 3) as st +|> aggregate group by x, y, x, x, st.z, (st).z, 1 + x, 2 + x +-- !query schema +struct +-- !query output +1 2 1 1 3 3 2 3 + + +-- !query +select 1 x, 2 y, 3 z +|> aggregate sum(z) z group by x, y +|> aggregate avg(z) z group by x +|> aggregate count(distinct z) c +-- !query schema +struct +-- !query output +1 + + +-- !query +select 1 x, 3 z +|> aggregate count(*) group by x, z, x +|> select x +-- !query schema +struct +-- !query output +1 + + +-- !query +select 3 as x, 4 as y +|> aggregate group by all +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE", + "sqlState" : "0A000", + "messageParameters" : { + "case" : "GROUP BY ALL" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 47, + "fragment" : "select 3 as x, 4 as y\n|> aggregate group by all" + } ] +} + + +-- !query +table courseSales +|> aggregate sum(earnings) group by rollup(course, `year`) +|> where course = 'dotNET' and `year` = '2013' +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE", + "sqlState" : "0A000", + "messageParameters" : { + "case" : "GROUP BY ROLLUP" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 123, + "fragment" : "table courseSales\n|> aggregate sum(earnings) group by rollup(course, `year`)\n|> where course = 'dotNET' and `year` = '2013'" + } ] +} + + +-- !query +table courseSales +|> aggregate sum(earnings) group by cube(course, `year`) +|> where course = 'dotNET' and `year` = '2013' +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE", + "sqlState" : "0A000", + "messageParameters" : { + "case" : "GROUP BY CUBE" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 121, + "fragment" : "table courseSales\n|> aggregate sum(earnings) group by cube(course, `year`)\n|> where course = 'dotNET' and `year` = '2013'" + } ] +} + + +-- !query +table courseSales +|> aggregate sum(earnings) group by course, `year` grouping sets(course, `year`) +|> where course = 'dotNET' and `year` = '2013' +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE", + "sqlState" : "0A000", + "messageParameters" : { + "case" : "GROUPING SETS" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 145, + "fragment" : "table courseSales\n|> aggregate sum(earnings) group by course, `year` grouping sets(course, `year`)\n|> where course = 'dotNET' and `year` = '2013'" + } ] +} + + +-- !query +table courseSales +|> aggregate sum(earnings), grouping(course) + 1 + group by course +|> where course = 'dotNET' and `year` = '2013' +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE", + "sqlState" : "0A000", + "messageParameters" : { + "case" : "GROUPING" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 132, + "fragment" : "table courseSales\n|> aggregate sum(earnings), grouping(course) + 1\n group by course\n|> where course = 'dotNET' and `year` = '2013'" + } ] +} + + +-- !query +table courseSales +|> aggregate sum(earnings), grouping_id(course) + group by course +|> where course = 'dotNET' and `year` = '2013' +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE", + "sqlState" : "0A000", + "messageParameters" : { + "case" : "GROUPING_ID" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 131, + "fragment" : "table courseSales\n|> aggregate sum(earnings), grouping_id(course)\n group by course\n|> where course = 'dotNET' and `year` = '2013'" + } ] +} + + +-- !query +select 1 as x, 2 as y +|> aggregate group by () +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { + "error" : "')'", + "hint" : "" + } +} + + +-- !query +table other +|> aggregate a +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "MISSING_GROUP_BY", + "sqlState" : "42803", + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 26, + "fragment" : "table other\n|> aggregate a" + } ] +} + + +-- !query +table other +|> aggregate a group by a +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE", + "sqlState" : "0A000", + "messageParameters" : { + "case" : "Non-aggregate expressions in place of aggregate functions, even if they appear separately in the GROUP BY clause" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 37, + "fragment" : "table other\n|> aggregate a group by a" + } ] +} + + +-- !query +table other +|> select sum(a) as result +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION", + "sqlState" : "0A000", + "messageParameters" : { + "expr" : "sum(a#x)" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 23, + "stopIndex" : 28, + "fragment" : "sum(a)" + } ] +} + + +-- !query +table other +|> aggregate +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "_LEGACY_ERROR_TEMP_0035", + "messageParameters" : { + "message" : "The AGGREGATE clause requires a list of aggregate expressions or a list of grouping expressions, or both" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 24, + "fragment" : "table other\n|> aggregate" + } ] +} + + +-- !query +table other +|> aggregate group by +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "UNRESOLVED_COLUMN.WITH_SUGGESTION", + "sqlState" : "42703", + "messageParameters" : { + "objectName" : "`group`", + "proposal" : "`a`, `b`" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 26, + "stopIndex" : 30, + "fragment" : "group" + } ] +} + + +-- !query +table other +|> group by a +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { + "error" : "'group'", + "hint" : "" + } +} + + +-- !query +table other +|> aggregate sum(a) over () group by b +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE", + "sqlState" : "0A000", + "messageParameters" : { + "case" : "window functions" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 1, + "stopIndex" : 50, + "fragment" : "table other\n|> aggregate sum(a) over () group by b" + } ] +} + + +-- !query +select 1 x, 2 y, 3 z +|> aggregate count(*) AS c, sum(x) AS x group by x +|> where c = 1 +|> where x = 1 +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.AnalysisException +{ + "errorClass" : "AMBIGUOUS_REFERENCE", + "sqlState" : "42704", + "messageParameters" : { + "name" : "`x`", + "referenceNames" : "[`x`, `x`]" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 97, + "stopIndex" : 97, + "fragment" : "x" + } ] +} + + -- !query drop table t -- !query schema diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala index fc1c9c6755572..ed1f5822ca85d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala @@ -955,6 +955,12 @@ class SparkSqlParserSuite extends AnalysisTest with SharedSparkSession { checkExcept("TABLE t |> MINUS DISTINCT TABLE t") checkIntersect("TABLE t |> INTERSECT ALL TABLE t") checkUnion("TABLE t |> UNION ALL TABLE t") + // Aggregation + parser.parsePlan("SELECT a, b FROM t |> AGGREGATE SUM(a)") + parser.parsePlan("SELECT a, b FROM t |> AGGREGATE SUM(a) AS result GROUP BY b") + parser.parsePlan("SELECT a, b FROM t |> AGGREGATE GROUP BY b") + parser.parsePlan("SELECT a, b FROM t |> AGGREGATE b, COUNT(*) AS result GROUP BY b") + parser.parsePlan("SELECT a, b FROM t |> AGGREGATE COUNT(*) AS result GROUP BY b WITH ROLLUP") } } }