Skip to content

Commit

Permalink
commit
Browse files Browse the repository at this point in the history
  • Loading branch information
dtenedor committed Sep 12, 2024
1 parent cf0c913 commit 9929f63
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,8 @@ class AstBuilder extends DataTypeAstBuilder
ctx.aggregationClause,
ctx.havingClause,
ctx.windowClause,
plan
plan,
isPipeOperatorSelect = false
)
}
}
Expand Down Expand Up @@ -1013,7 +1014,8 @@ class AstBuilder extends DataTypeAstBuilder
ctx.aggregationClause,
ctx.havingClause,
ctx.windowClause,
from
from,
isPipeOperatorSelect = false
)
}

Expand Down Expand Up @@ -1100,7 +1102,8 @@ class AstBuilder extends DataTypeAstBuilder
aggregationClause,
havingClause,
windowClause,
isDistinct = false)
isDistinct = false,
isPipeOperatorSelect = false)

ScriptTransformation(
string(visitStringLit(transformClause.script)),
Expand All @@ -1121,6 +1124,8 @@ class AstBuilder extends DataTypeAstBuilder
* Add a regular (SELECT) query specification to a logical plan. The query specification
* is the core of the logical plan, this is where sourcing (FROM clause), projection (SELECT),
* aggregation (GROUP BY ... HAVING ...) and filtering (WHERE) takes place.
* If 'isPipeOperatorSelect' is true, wraps each projected expression with a [[PipeSelect]]
* expression for future validation of the expressions during analysis.
*
* Note that query hints are ignored (both by the parser and the builder).
*/
Expand All @@ -1132,7 +1137,8 @@ class AstBuilder extends DataTypeAstBuilder
aggregationClause: AggregationClauseContext,
havingClause: HavingClauseContext,
windowClause: WindowClauseContext,
relation: LogicalPlan): LogicalPlan = withOrigin(ctx) {
relation: LogicalPlan,
isPipeOperatorSelect: Boolean): LogicalPlan = withOrigin(ctx) {
val isDistinct = selectClause.setQuantifier() != null &&
selectClause.setQuantifier().DISTINCT() != null

Expand All @@ -1144,7 +1150,8 @@ class AstBuilder extends DataTypeAstBuilder
aggregationClause,
havingClause,
windowClause,
isDistinct)
isDistinct,
isPipeOperatorSelect)

// Hint
selectClause.hints.asScala.foldRight(plan)(withHints)
Expand All @@ -1158,7 +1165,8 @@ class AstBuilder extends DataTypeAstBuilder
aggregationClause: AggregationClauseContext,
havingClause: HavingClauseContext,
windowClause: WindowClauseContext,
isDistinct: Boolean): LogicalPlan = {
isDistinct: Boolean,
isPipeOperatorSelect: Boolean): LogicalPlan = {
// Add lateral views.
val withLateralView = lateralView.asScala.foldLeft(relation)(withGenerate)

Expand All @@ -1172,7 +1180,20 @@ class AstBuilder extends DataTypeAstBuilder
}

def createProject() = if (namedExpressions.nonEmpty) {
Project(namedExpressions, withFilter)
val newProjectList: Seq[NamedExpression] = if (isPipeOperatorSelect) {
// If this is a pipe operator |> SELECT clause, add a [[PipeSelect]] expression wrapping
// each alias in the project list, so the analyzer can check invariants later.
namedExpressions.map {
case a: Alias =>
a.withNewChildren(Seq(PipeSelect(a.child)))
.asInstanceOf[NamedExpression]
case other =>
other
}
} else {
namedExpressions
}
Project(newProjectList, withFilter)
} else {
withFilter
}
Expand Down Expand Up @@ -5721,16 +5742,6 @@ class AstBuilder extends DataTypeAstBuilder
operationNotAllowed("Operator pipe SQL syntax using |>", ctx)
}
Option(ctx.selectClause).map { c =>
def updateProject(p: Project): Project = {
val newProjectList: Seq[NamedExpression] = p.projectList.map {
case a: Alias =>
a.withNewChildren(Seq(PipeSelect(a.child)))
.asInstanceOf[NamedExpression]
case other =>
other
}
p.copy(projectList = newProjectList)
}
withSelectQuerySpecification(
ctx = ctx,
selectClause = c,
Expand All @@ -5739,16 +5750,8 @@ class AstBuilder extends DataTypeAstBuilder
aggregationClause = null,
havingClause = null,
windowClause = null,
left) match {
// The input should generally be a projection since we only pass a context for the SELECT
// clause here and pass "null" for all other clauses.
case p: Project =>
updateProject(p)
case d @ Distinct(p: Project) =>
d.copy(child = updateProject(p))
case other =>
throw SparkException.internalError(s"Unrecognized matched logical plan: $other")
}
relation = left,
isPipeOperatorSelect = true)
}.getOrElse(Option(ctx.whereClause).map { c =>
// Add a table subquery boundary between the new filter and the input plan if one does not
// already exist. This helps the analyzer behave as if we had added the WHERE clause after a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,20 +246,72 @@ Distinct


-- !query
<<<<<<< HEAD
table t
|> select *
-- !query analysis
Project [x#x, y#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> select * except (y)
-- !query analysis
Project [x#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> select /*+ repartition(3) */ *
-- !query analysis
Repartition 3, true
+- Project [x#x, y#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> select /*+ repartition(3) */ distinct x
-- !query analysis
Repartition 3, true
+- Distinct
+- Project [x#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> select /*+ repartition(3) */ all x
-- !query analysis
Repartition 3, true
+- Project [x#x]
+- SubqueryAlias spark_catalog.default.t
+- Relation spark_catalog.default.t[x#x,y#x] csv


-- !query
table t
|> select sum(x) as result
-- !query analysis
org.apache.spark.sql.catalyst.parser.ParseException
org.apache.spark.sql.AnalysisException
{
"errorClass" : "PARSE_SYNTAX_ERROR",
"sqlState" : "42601",
"errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION",
"sqlState" : "0A000",
"messageParameters" : {
"error" : "'<<'",
"hint" : ""
}
"expr" : "sum(x#x)"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 19,
"stopIndex" : 24,
"fragment" : "sum(x)"
} ]
}


Expand Down
17 changes: 16 additions & 1 deletion sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,22 @@ table t
table t
|> select distinct x, y;

<<<<<<< HEAD
-- SELECT * is supported.
table t
|> select *;

table t
|> select * except (y);

-- Hints are supported.
table t
|> select /*+ repartition(3) */ *;

table t
|> select /*+ repartition(3) */ distinct x;

table t
|> select /*+ repartition(3) */ all x;

-- SELECT operators: negative tests.
---------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,22 +231,75 @@ struct<x:int,y:string>


-- !query
<<<<<<< HEAD
table t
|> select *
-- !query schema
struct<x:int,y:string>
-- !query output
0 abc
1 def


-- !query
table t
|> select * except (y)
-- !query schema
struct<x:int>
-- !query output
0
1


-- !query
table t
|> select /*+ repartition(3) */ *
-- !query schema
struct<x:int,y:string>
-- !query output
0 abc
1 def


-- !query
table t
|> select /*+ repartition(3) */ distinct x
-- !query schema
struct<x:int>
-- !query output
0
1


-- !query
table t
|> select /*+ repartition(3) */ all x
-- !query schema
struct<x:int>
-- !query output
0
1


-- !query
table t
|> select sum(x) as result
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.catalyst.parser.ParseException
org.apache.spark.sql.AnalysisException
{
"errorClass" : "PARSE_SYNTAX_ERROR",
"sqlState" : "42601",
"errorClass" : "PIPE_OPERATOR_SELECT_CONTAINS_AGGREGATE_FUNCTION",
"sqlState" : "0A000",
"messageParameters" : {
"error" : "'<<'",
"hint" : ""
}
"expr" : "sum(x#x)"
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 19,
"stopIndex" : 24,
"fragment" : "sum(x)"
} ]
}


Expand Down

0 comments on commit 9929f63

Please sign in to comment.