Skip to content

Commit

Permalink
feat: add a LIMIT 1 on EXISTS subqueries to limit network overhead
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <andres@planetscale.com>
  • Loading branch information
systay committed Jun 14, 2024
1 parent cd17de9 commit 711aa59
Show file tree
Hide file tree
Showing 9 changed files with 239 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ select name from user where id not in (select id from t1) /* non-correlated subq
----------------------------------------------------------------------
select name from user where exists (select id from t1) /* non-correlated subquery as EXISTS */

1 ks_unsharded/-: select 1 from t1 limit 10001 /* non-correlated subquery as EXISTS */
1 ks_unsharded/-: select 1 from t1 limit 1 /* non-correlated subquery as EXISTS */
2 ks_sharded/-40: select `name` from `user` where 1 limit 10001 /* non-correlated subquery as EXISTS */
2 ks_sharded/40-80: select `name` from `user` where 1 limit 10001 /* non-correlated subquery as EXISTS */
2 ks_sharded/80-c0: select `name` from `user` where 1 limit 10001 /* non-correlated subquery as EXISTS */
Expand Down
103 changes: 102 additions & 1 deletion go/vt/vtgate/planbuilder/operators/query_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package operators
import (
"fmt"
"io"
"strconv"

"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
Expand Down Expand Up @@ -244,8 +245,108 @@ func tryPushLimit(ctx *plancontext.PlanningContext, in *Limit) (Operator, *Apply
}

return src, Rewrote(fmt.Sprintf("push limit to %s of apply join", side))
case *Limit:
combinedLimit := mergeLimits(in.AST, src.AST)
if combinedLimit == nil {
break
}
// we can remove the other LIMIT
in.AST = combinedLimit
in.Source = src.Source
return in, Rewrote("merged two limits")

}
return setUpperLimit(in)
}

func mergeLimits(l1, l2 *sqlparser.Limit) *sqlparser.Limit {
// To merge two relational LIMIT operators with LIMIT and OFFSET, we need to combine their
// LIMIT and OFFSET values appropriately.
// Let's denote the first LIMIT operator as LIMIT_1 with LIMIT_1 and OFFSET_1,
// and the second LIMIT operator as LIMIT_2 with LIMIT_2 and OFFSET_2.
// The second LIMIT operator receives the output of the first LIMIT operator, meaning the first LIMIT and
// OFFSET are applied first, and then the second LIMIT and OFFSET are applied to the resulting subset.
//
// The goal is to determine the effective combined LIMIT and OFFSET values when applying these two operators sequentially.
//
// Combined Offset:
// The combined offset (OFFSET_combined) is the sum of the two offsets because you need to skip OFFSET_1 rows first,
// and then apply the second offset OFFSET_2 to the result.
// OFFSET_combined = OFFSET_1 + OFFSET_2

// Combined Limit:
// The combined limit (LIMIT_combined) needs to account for both limits. The effective limit should not exceed the rows returned by the first limit,
// so it is the minimum of the remaining rows after the first offset and the second limit.
// LIMIT_combined = min(LIMIT_2, LIMIT_1 - OFFSET_2)

// Note: If LIMIT_1 - OFFSET_2 is negative or zero, it means there are no rows left to limit, so LIMIT_combined should be zero.

// Example:
// First LIMIT operator: LIMIT 10 OFFSET 5 (LIMIT_1 = 10, OFFSET_1 = 5)
// Second LIMIT operator: LIMIT 7 OFFSET 3 (LIMIT_2 = 7, OFFSET_2 = 3)

// Calculations:
// Combined OFFSET:
// OFFSET_combined = 5 + 3 = 8

// Combined LIMIT:
// remaining rows after OFFSET_2 = 10 - 3 = 7
// LIMIT_combined = min(7, 7) = 7

// So, the combined result would be:
// LIMIT 7 OFFSET 8

// This method ensures that the final combined LIMIT and OFFSET correctly reflect the sequential application of the two original operators.

offsetMerger := func(v1, v2 int) int {
return v1 + v2
}

failed := false
limitMerger := func(v1, v2 int) int {
if l2.Offset == nil {
return min(v1, v2)
}
off2, ok := l2.Offset.(*sqlparser.Literal)
if !ok {
failed = true
return 0
}
off2int, _ := strconv.Atoi(off2.Val)
return min(v2, v1-off2int)
}

limit := &sqlparser.Limit{
Offset: mergeLimitExpressions(l1.Offset, l2.Offset, offsetMerger),
Rowcount: mergeLimitExpressions(l1.Rowcount, l2.Rowcount, limitMerger),
}
if failed {
return nil
}

return limit
}

func mergeLimitExpressions(e1, e2 sqlparser.Expr, merger func(v1, v2 int) int) sqlparser.Expr {
switch {
case e1 == nil && e2 == nil:
return nil
case e1 == nil:
return e2
case e2 == nil:
return e1
default:
return setUpperLimit(in)
v1str, ok := e1.(*sqlparser.Literal)
if !ok {
return nil
}
v2str, ok := e2.(*sqlparser.Literal)
if !ok {
return nil
}
v1, _ := strconv.Atoi(v1str.Val)
v2, _ := strconv.Atoi(v2str.Val)
return sqlparser.NewIntLiteral(strconv.Itoa(merger(v1, v2)))
}
}

Expand Down
12 changes: 12 additions & 0 deletions go/vt/vtgate/planbuilder/operators/subquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,11 +228,21 @@ func (sq *SubQuery) settle(ctx *plancontext.PlanningContext, outer Operator) Ope
var correlatedSubqueryErr = vterrors.VT12001("correlated subquery is only supported for EXISTS")
var subqueryNotAtTopErr = vterrors.VT12001("unmergable subquery can not be inside complex expression")

func (sq *SubQuery) addLimit() {
// for a correlated subquery, we can add a limit 1 to the subquery
sq.Subquery = &Limit{
Source: sq.Subquery,
AST: &sqlparser.Limit{Rowcount: sqlparser.NewIntLiteral("1")},
Top: true,
}
}

func (sq *SubQuery) settleFilter(ctx *plancontext.PlanningContext, outer Operator) Operator {
if len(sq.Predicates) > 0 {
if sq.FilterType != opcode.PulloutExists {
panic(correlatedSubqueryErr)
}
sq.addLimit()
return outer
}

Expand Down Expand Up @@ -260,8 +270,10 @@ func (sq *SubQuery) settleFilter(ctx *plancontext.PlanningContext, outer Operato
var predicates []sqlparser.Expr
switch sq.FilterType {
case opcode.PulloutExists:
sq.addLimit()
predicates = append(predicates, sqlparser.NewArgument(hasValuesArg()))
case opcode.PulloutNotExists:
sq.addLimit()
sq.FilterType = opcode.PulloutExists // it's the same pullout as EXISTS, just with a NOT in front of the predicate
predicates = append(predicates, sqlparser.NewNotExpr(sqlparser.NewArgument(hasValuesArg())))
case opcode.PulloutIn:
Expand Down
26 changes: 16 additions & 10 deletions go/vt/vtgate/planbuilder/testdata/aggr_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -1735,7 +1735,7 @@
"Sharded": true
},
"FieldQuery": "select 1 from user_extra where 1 != 1",
"Query": "select 1 from user_extra where user_id = 3 and user_id < :user_id",
"Query": "select 1 from user_extra where user_id = 3 and user_id < :user_id limit 1",
"Table": "user_extra",
"Values": [
"3"
Expand Down Expand Up @@ -2590,15 +2590,21 @@
},
{
"InputName": "SubQuery",
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select 1 from user_extra where 1 != 1",
"Query": "select 1 from user_extra where user_extra.bar = :user_apa",
"Table": "user_extra"
"OperatorType": "Limit",
"Count": "1",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select 1 from user_extra where 1 != 1",
"Query": "select 1 from user_extra where user_extra.bar = :user_apa limit 1",
"Table": "user_extra"
}
]
}
]
}
Expand Down
28 changes: 17 additions & 11 deletions go/vt/vtgate/planbuilder/testdata/filter_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -2014,15 +2014,21 @@
"Inputs": [
{
"InputName": "SubQuery",
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select 1 from `user` where 1 != 1",
"Query": "select 1 from `user`",
"Table": "`user`"
"OperatorType": "Limit",
"Count": "1",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select 1 from `user` where 1 != 1",
"Query": "select 1 from `user` limit 1",
"Table": "`user`"
}
]
},
{
"InputName": "Outer",
Expand Down Expand Up @@ -2854,7 +2860,7 @@
"Sharded": true
},
"FieldQuery": "select 1 from `user` as u2 where 1 != 1",
"Query": "select 1 from `user` as u2 where u2.id = 5",
"Query": "select 1 from `user` as u2 where u2.id = 5 limit 1",
"Table": "`user`",
"Values": [
"5"
Expand Down Expand Up @@ -4311,7 +4317,7 @@
"Sharded": false
},
"FieldQuery": "select 1 from unsharded as u2 where 1 != 1",
"Query": "select 1 from unsharded as u2 where u2.baz = :u1_bar",
"Query": "select 1 from unsharded as u2 where u2.baz = :u1_bar limit 1",
"Table": "unsharded"
}
]
Expand Down
52 changes: 29 additions & 23 deletions go/vt/vtgate/planbuilder/testdata/info_schema57_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -946,31 +946,37 @@
"Inputs": [
{
"InputName": "SubQuery",
"OperatorType": "Concatenate",
"OperatorType": "Limit",
"Count": "1",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "DBA",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "select 1 as found from information_schema.`tables` where 1 != 1",
"Query": "select 1 as found from information_schema.`tables` where table_name = :table_name1 /* VARCHAR */ and table_name = :table_name1 /* VARCHAR */",
"SysTableTableName": "[table_name1:'Music']",
"Table": "information_schema.`tables`"
},
{
"OperatorType": "Route",
"Variant": "DBA",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "select 1 as found from information_schema.views where 1 != 1",
"Query": "select 1 as found from information_schema.views where table_name = :table_name2 /* VARCHAR */ and table_name = :table_name2 /* VARCHAR */ limit 1",
"SysTableTableName": "[table_name2:'user']",
"Table": "information_schema.views"
"OperatorType": "Concatenate",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "DBA",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "select 1 as found from information_schema.`tables` where 1 != 1",
"Query": "select 1 as found from information_schema.`tables` where table_name = :table_name1 /* VARCHAR */ and table_name = :table_name1 /* VARCHAR */ limit :__upper_limit",
"SysTableTableName": "[table_name1:'Music']",
"Table": "information_schema.`tables`"
},
{
"OperatorType": "Route",
"Variant": "DBA",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "select 1 as found from information_schema.views where 1 != 1",
"Query": "select 1 as found from information_schema.views where table_name = :table_name2 /* VARCHAR */ and table_name = :table_name2 /* VARCHAR */ limit :__upper_limit",
"SysTableTableName": "[table_name2:'user']",
"Table": "information_schema.views"
}
]
}
]
},
Expand Down
52 changes: 29 additions & 23 deletions go/vt/vtgate/planbuilder/testdata/info_schema80_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -1011,31 +1011,37 @@
"Inputs": [
{
"InputName": "SubQuery",
"OperatorType": "Concatenate",
"OperatorType": "Limit",
"Count": "1",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "DBA",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "select 1 as found from information_schema.`tables` where 1 != 1",
"Query": "select 1 as found from information_schema.`tables` where table_name = :table_name1 /* VARCHAR */ and table_name = :table_name1 /* VARCHAR */",
"SysTableTableName": "[table_name1:'Music']",
"Table": "information_schema.`tables`"
},
{
"OperatorType": "Route",
"Variant": "DBA",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "select 1 as found from information_schema.views where 1 != 1",
"Query": "select 1 as found from information_schema.views where table_name = :table_name2 /* VARCHAR */ and table_name = :table_name2 /* VARCHAR */ limit 1",
"SysTableTableName": "[table_name2:'user']",
"Table": "information_schema.views"
"OperatorType": "Concatenate",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "DBA",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "select 1 as found from information_schema.`tables` where 1 != 1",
"Query": "select 1 as found from information_schema.`tables` where table_name = :table_name1 /* VARCHAR */ and table_name = :table_name1 /* VARCHAR */ limit :__upper_limit",
"SysTableTableName": "[table_name1:'Music']",
"Table": "information_schema.`tables`"
},
{
"OperatorType": "Route",
"Variant": "DBA",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "select 1 as found from information_schema.views where 1 != 1",
"Query": "select 1 as found from information_schema.views where table_name = :table_name2 /* VARCHAR */ and table_name = :table_name2 /* VARCHAR */ limit :__upper_limit",
"SysTableTableName": "[table_name2:'user']",
"Table": "information_schema.views"
}
]
}
]
},
Expand Down
Loading

0 comments on commit 711aa59

Please sign in to comment.