Skip to content

Commit

Permalink
commit
Browse files Browse the repository at this point in the history
commit

commit

commit

commit

commit

commit

commit

commit
  • Loading branch information
dtenedor committed Oct 17, 2024
1 parent fed9a8d commit e79e203
Show file tree
Hide file tree
Showing 10 changed files with 1,094 additions and 3 deletions.
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -5036,6 +5036,11 @@
"Invalid partitioning: <cols> is missing or is in a map or array."
]
},
"PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE" : {
"message" : [
"The SQL pipe operator syntax with aggregation (using |> AGGREGATE) does not support <case>."
]
},
"PIVOT_AFTER_GROUP_BY" : {
"message" : [
"PIVOT clause following a GROUP BY clause. Consider pushing the GROUP BY into a subquery."
Expand Down
1 change: 1 addition & 0 deletions docs/sql-ref-ansi-compliance.md
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ Below is a list of all the keywords in Spark SQL.
|--|----------------------|-------------------------|--------|
|ADD|non-reserved|non-reserved|non-reserved|
|AFTER|non-reserved|non-reserved|non-reserved|
|AGGREGATE|non-reserved|non-reserved|non-reserved|
|ALL|reserved|non-reserved|reserved|
|ALTER|non-reserved|non-reserved|reserved|
|ALWAYS|non-reserved|non-reserved|non-reserved|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ BANG: '!';
//--SPARK-KEYWORD-LIST-START
ADD: 'ADD';
AFTER: 'AFTER';
AGGREGATE: 'AGGREGATE';
ALL: 'ALL';
ALTER: 'ALTER';
ALWAYS: 'ALWAYS';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@ temporalClause
aggregationClause
: GROUP BY groupingExpressionsWithGroupingAnalytics+=groupByClause
(COMMA groupingExpressionsWithGroupingAnalytics+=groupByClause)*
| GROUP BY groupingExpressions+=expression (COMMA groupingExpressions+=expression)* (
| GROUP BY groupingExpressions+=namedExpression (COMMA groupingExpressions+=namedExpression)* (
WITH kind=ROLLUP
| WITH kind=CUBE
| kind=GROUPING SETS LEFT_PAREN groupingSet (COMMA groupingSet)* RIGHT_PAREN)?
Expand Down Expand Up @@ -1512,6 +1512,7 @@ operatorPipeRightSide
| sample
| joinRelation
| operator=(UNION | EXCEPT | SETMINUS | INTERSECT) setQuantifier? right=queryTerm
| AGGREGATE namedExpressionSeq? aggregationClause?
;

// When `SQL_standard_keyword_behavior=true`, there are 2 kinds of keywords in Spark SQL.
Expand All @@ -1528,6 +1529,7 @@ ansiNonReserved
//--ANSI-NON-RESERVED-START
: ADD
| AFTER
| AGGREGATE
| ALTER
| ALWAYS
| ANALYZE
Expand Down Expand Up @@ -1850,6 +1852,7 @@ nonReserved
//--DEFAULT-NON-RESERVED-START
: ADD
| AFTER
| AGGREGATE
| ALL
| ALTER
| ALWAYS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ private[sql] object QueryParsingErrors extends DataTypeErrorsBase {
new ParseException(errorClass = "UNSUPPORTED_FEATURE.COMBINATION_QUERY_RESULT_CLAUSES", ctx)
}

def pipeOperatorAggregateUnsupportedCaseError(
caseArgument: String,
ctx: ParserRuleContext): Throwable = {
new ParseException(
errorClass = "UNSUPPORTED_FEATURE.PIPE_OPERATOR_AGGREGATE_UNSUPPORTED_CASE",
messageParameters = Map("case" -> caseArgument),
ctx)
}

def distributeByUnsupportedError(ctx: QueryOrganizationContext): Throwable = {
new ParseException(errorClass = "_LEGACY_ERROR_TEMP_0012", ctx)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1481,7 +1481,7 @@ class AstBuilder extends DataTypeAstBuilder
selectExpressions: Seq[NamedExpression],
query: LogicalPlan): LogicalPlan = withOrigin(ctx) {
if (ctx.groupingExpressionsWithGroupingAnalytics.isEmpty) {
val groupByExpressions = expressionList(ctx.groupingExpressions)
val groupByExpressions = namedExpressionList(ctx.groupingExpressions)
if (ctx.GROUPING != null) {
// GROUP BY ... GROUPING SETS (...)
// `groupByExpressions` can be non-empty for Hive compatibility. It may add extra grouping
Expand Down Expand Up @@ -2172,6 +2172,14 @@ class AstBuilder extends DataTypeAstBuilder
trees.asScala.map(expression).toSeq
}

/**
* Create sequence of named expressions from the given sequence of contexts.
*/
private def namedExpressionList(
trees: java.util.List[NamedExpressionContext]): Seq[Expression] = {
trees.asScala.map(visitNamedExpression).toSeq
}

/**
* Create a star (i.e. all) expression; this selects all elements (in the specified object).
* Both un-targeted (global) and targeted aliases are supported.
Expand Down Expand Up @@ -5924,7 +5932,79 @@ class AstBuilder extends DataTypeAstBuilder
}.getOrElse(Option(ctx.operator).map { c =>
val all = Option(ctx.setQuantifier()).exists(_.ALL != null)
visitSetOperationImpl(left, plan(ctx.right), all, c.getType)
}.get))))))
}.getOrElse(
visitOperatorPipeAggregate(ctx, left)
)))))))
}

private def visitOperatorPipeAggregate(
ctx: OperatorPipeRightSideContext, left: LogicalPlan): LogicalPlan = {
assert(ctx.AGGREGATE != null)
if (ctx.namedExpressionSeq() == null && ctx.aggregationClause() == null) {
operationNotAllowed(
"The AGGREGATE clause requires a list of aggregate expressions " +
"or a list of grouping expressions, or both", ctx)
}
val aggregateExpressions: Seq[NamedExpression] =
Option(ctx.namedExpressionSeq()).map { n: NamedExpressionSeqContext =>
visitNamedExpressionSeq(n).map {
case (e: NamedExpression, _) => e
case (e: Expression, aliasFunc) => UnresolvedAlias(e, aliasFunc)
}
}.getOrElse(Seq.empty)
Option(ctx.aggregationClause()).map { c: AggregationClauseContext =>
withAggregationClause(c, aggregateExpressions, left) match {
case a: Aggregate =>
// GROUP BY ALL, GROUP BY CUBE, GROUPING_ID, GROUPING SETS, and GROUP BY ROLLUP are not
// supported yet.
def error(s: String): Unit =
throw QueryParsingErrors.pipeOperatorAggregateUnsupportedCaseError(s, c)
a.groupingExpressions match {
case Seq(key: UnresolvedAttribute) if key.equalsIgnoreCase("ALL") =>
error("GROUP BY ALL")
case _ =>
}
def visit(e: Expression): Unit = {
e match {
case _: Cube => error("GROUP BY CUBE")
case _: GroupingSets => error("GROUPING SETS")
case _: Rollup => error("GROUP BY ROLLUP")
case f: UnresolvedFunction if f.arguments.length == 1 && f.nameParts.length == 1 =>
Seq("GROUPING", "GROUPING_ID").foreach { name =>
if (f.nameParts.head.equalsIgnoreCase(name)) error(name)
}
case _: WindowSpec => error("window functions")
case _ =>
}
e.children.foreach(visit)
}
a.groupingExpressions.foreach(visit)
a.aggregateExpressions.foreach(visit)
// Non-aggregate expressions are not allowed in place of aggregate functions, even if they
// appear separately in the GROUP BY clause.
val groupingExpressionSet = a.groupingExpressions.toSet
a.aggregateExpressions.foreach { e: Expression =>
if (groupingExpressionSet.contains(e)) {
error("Non-aggregate expressions in place of aggregate functions, even if they " +
"appear separately in the GROUP BY clause")
}
}
// Prepend grouping keys to the list of aggregate functions, since operator pipe AGGREGATE
// clause returns the GROUP BY expressions followed by the list of aggregate functions.
val namedGroupingExpressions: Seq[NamedExpression] =
a.groupingExpressions.map {
case n: NamedExpression => n
case e: Expression => UnresolvedAlias(e, None)
}
a.copy(aggregateExpressions = namedGroupingExpressions ++ a.aggregateExpressions)
}
}.getOrElse {
// This is a table aggregation with no grouping expressions.
Aggregate(
groupingExpressions = Seq.empty,
aggregateExpressions = aggregateExpressions,
child = left)
}
}

/**
Expand Down
Loading

0 comments on commit e79e203

Please sign in to comment.