diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index f090e85f1706e..5f94118c8eb73 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -425,7 +425,8 @@ class AstBuilder extends DataTypeAstBuilder ctx.aggregationClause, ctx.havingClause, ctx.windowClause, - plan + plan, + isPipeOperatorSelect = false ) } } @@ -1013,7 +1014,8 @@ class AstBuilder extends DataTypeAstBuilder ctx.aggregationClause, ctx.havingClause, ctx.windowClause, - from + from, + isPipeOperatorSelect = false ) } @@ -1100,7 +1102,8 @@ class AstBuilder extends DataTypeAstBuilder aggregationClause, havingClause, windowClause, - isDistinct = false) + isDistinct = false, + isPipeOperatorSelect = false) ScriptTransformation( string(visitStringLit(transformClause.script)), @@ -1121,6 +1124,8 @@ class AstBuilder extends DataTypeAstBuilder * Add a regular (SELECT) query specification to a logical plan. The query specification * is the core of the logical plan, this is where sourcing (FROM clause), projection (SELECT), * aggregation (GROUP BY ... HAVING ...) and filtering (WHERE) takes place. + * If 'isPipeOperatorSelect' is true, wraps each projected expression with a [[PipeSelect]] + * expression for future validation of the expressions during analysis. * * Note that query hints are ignored (both by the parser and the builder). */ @@ -1132,7 +1137,8 @@ class AstBuilder extends DataTypeAstBuilder aggregationClause: AggregationClauseContext, havingClause: HavingClauseContext, windowClause: WindowClauseContext, - relation: LogicalPlan): LogicalPlan = withOrigin(ctx) { + relation: LogicalPlan, + isPipeOperatorSelect: Boolean): LogicalPlan = withOrigin(ctx) { val isDistinct = selectClause.setQuantifier() != null && selectClause.setQuantifier().DISTINCT() != null @@ -1144,7 +1150,8 @@ class AstBuilder extends DataTypeAstBuilder aggregationClause, havingClause, windowClause, - isDistinct) + isDistinct, + isPipeOperatorSelect) // Hint selectClause.hints.asScala.foldRight(plan)(withHints) @@ -1158,7 +1165,8 @@ class AstBuilder extends DataTypeAstBuilder aggregationClause: AggregationClauseContext, havingClause: HavingClauseContext, windowClause: WindowClauseContext, - isDistinct: Boolean): LogicalPlan = { + isDistinct: Boolean, + isPipeOperatorSelect: Boolean): LogicalPlan = { // Add lateral views. val withLateralView = lateralView.asScala.foldLeft(relation)(withGenerate) @@ -1172,7 +1180,20 @@ class AstBuilder extends DataTypeAstBuilder } def createProject() = if (namedExpressions.nonEmpty) { - Project(namedExpressions, withFilter) + val newProjectList: Seq[NamedExpression] = if (isPipeOperatorSelect) { + // If this is a pipe operator |> SELECT clause, add a [[PipeSelect]] expression wrapping + // each alias in the project list, so the analyzer can check invariants later. + namedExpressions.map { + case a: Alias => + a.withNewChildren(Seq(PipeSelect(a.child))) + .asInstanceOf[NamedExpression] + case other => + other + } + } else { + namedExpressions + } + Project(newProjectList, withFilter) } else { withFilter } @@ -5721,16 +5742,6 @@ class AstBuilder extends DataTypeAstBuilder operationNotAllowed("Operator pipe SQL syntax using |>", ctx) } Option(ctx.selectClause).map { c => - def updateProject(p: Project): Project = { - val newProjectList: Seq[NamedExpression] = p.projectList.map { - case a: Alias => - a.withNewChildren(Seq(PipeSelect(a.child))) - .asInstanceOf[NamedExpression] - case other => - other - } - p.copy(projectList = newProjectList) - } withSelectQuerySpecification( ctx = ctx, selectClause = c, @@ -5739,16 +5750,8 @@ class AstBuilder extends DataTypeAstBuilder aggregationClause = null, havingClause = null, windowClause = null, - left) match { - // The input should generally be a projection since we only pass a context for the SELECT - // clause here and pass "null" for all other clauses. - case p: Project => - updateProject(p) - case d @ Distinct(p: Project) => - d.copy(child = updateProject(p)) - case other => - throw SparkException.internalError(s"Unrecognized matched logical plan: $other") - } + relation = left, + isPipeOperatorSelect = true) }.getOrElse(Option(ctx.whereClause).map { c => // Add a table subquery boundary between the new filter and the input plan if one does not // already exist. This helps the analyzer behave as if we had added the WHERE clause after a diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out index a751354fb45db..f5d2e236b6d30 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out @@ -246,20 +246,72 @@ Distinct -- !query -<<<<<<< HEAD +table t +|> select * +-- !query analysis +Project [x#x, y#x] ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> select * except (y) +-- !query analysis +Project [x#x] ++- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> select /*+ repartition(3) */ * +-- !query analysis +Repartition 3, true ++- Project [x#x, y#x] + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> select /*+ repartition(3) */ distinct x +-- !query analysis +Repartition 3, true ++- Distinct + +- Project [x#x] + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv + + +-- !query +table t +|> select /*+ repartition(3) */ all x +-- !query analysis +Repartition 3, true ++- Project [x#x] + +- SubqueryAlias spark_catalog.default.t + +- Relation spark_catalog.default.t[x#x,y#x] csv +-- !query table t |> select sum(x) as result -- !query analysis -org.apache.spark.sql.catalyst.parser.ParseException +org.apache.spark.sql.AnalysisException { - "errorClass" : "PARSE_SYNTAX_ERROR", - "sqlState" : "42601", + "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION", + "sqlState" : "0A000", "messageParameters" : { - "error" : "'<<'", - "hint" : "" - } + "expr" : "sum(x#x)" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 19, + "stopIndex" : 24, + "fragment" : "sum(x)" + } ] } diff --git a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql index a2fe9f54d6db8..b51d4c03fc5da 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql @@ -82,7 +82,22 @@ table t table t |> select distinct x, y; -<<<<<<< HEAD +-- SELECT * is supported. +table t +|> select *; + +table t +|> select * except (y); + +-- Hints are supported. +table t +|> select /*+ repartition(3) */ *; + +table t +|> select /*+ repartition(3) */ distinct x; + +table t +|> select /*+ repartition(3) */ all x; -- SELECT operators: negative tests. --------------------------------------- diff --git a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out index 38189f5794b69..e7f7c73aaac42 100644 --- a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out @@ -231,22 +231,75 @@ struct -- !query -<<<<<<< HEAD +table t +|> select * +-- !query schema +struct +-- !query output +0 abc +1 def + + +-- !query +table t +|> select * except (y) +-- !query schema +struct +-- !query output +0 +1 + + +-- !query +table t +|> select /*+ repartition(3) */ * +-- !query schema +struct +-- !query output +0 abc +1 def + + +-- !query +table t +|> select /*+ repartition(3) */ distinct x +-- !query schema +struct +-- !query output +0 +1 +-- !query +table t +|> select /*+ repartition(3) */ all x +-- !query schema +struct +-- !query output +0 +1 + + +-- !query table t |> select sum(x) as result -- !query schema struct<> -- !query output -org.apache.spark.sql.catalyst.parser.ParseException +org.apache.spark.sql.AnalysisException { - "errorClass" : "PARSE_SYNTAX_ERROR", - "sqlState" : "42601", + "errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION", + "sqlState" : "0A000", "messageParameters" : { - "error" : "'<<'", - "hint" : "" - } + "expr" : "sum(x#x)" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 19, + "stopIndex" : 24, + "fragment" : "sum(x)" + } ] }