Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-18.0] fix: reference table join merge (#16488) #16495

Merged
merged 1 commit into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 9 additions & 60 deletions go/test/endtoend/vtgate/queries/reference/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package reference

import (
"context"
_ "embed"
"flag"
"fmt"
"os"
Expand All @@ -39,68 +40,16 @@ var (
vtParams mysql.ConnParams

unshardedKeyspaceName = "uks"
unshardedSQLSchema = `
CREATE TABLE IF NOT EXISTS zip(
id BIGINT NOT NULL AUTO_INCREMENT,
code5 INT(5) NOT NULL,
PRIMARY KEY(id)
) ENGINE=InnoDB;
//go:embed uschema.sql
unshardedSQLSchema string
//go:embed uvschema.json
unshardedVSchema string

INSERT INTO zip(id, code5)
VALUES (1, 47107),
(2, 82845),
(3, 11237);

CREATE TABLE IF NOT EXISTS zip_detail(
id BIGINT NOT NULL AUTO_INCREMENT,
zip_id BIGINT NOT NULL,
discontinued_at DATE,
PRIMARY KEY(id)
) ENGINE=InnoDB;

`
unshardedVSchema = `
{
"sharded":false,
"tables": {
"zip": {},
"zip_detail": {}
}
}
`
shardedKeyspaceName = "sks"
shardedSQLSchema = `
CREATE TABLE IF NOT EXISTS delivery_failure (
id BIGINT NOT NULL,
zip_detail_id BIGINT NOT NULL,
reason VARCHAR(255),
PRIMARY KEY(id)
) ENGINE=InnoDB;
`
shardedVSchema = `
{
"sharded": true,
"vindexes": {
"hash": {
"type": "hash"
}
},
"tables": {
"delivery_failure": {
"columnVindexes": [
{
"column": "id",
"name": "hash"
}
]
},
"zip_detail": {
"type": "reference",
"source": "` + unshardedKeyspaceName + `.zip_detail"
}
}
}
`
//go:embed sschema.sql
shardedSQLSchema string
//go:embed svschema.json
shardedVSchema string
)

func TestMain(m *testing.M) {
Expand Down
32 changes: 24 additions & 8 deletions go/test/endtoend/vtgate/queries/reference/reference_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,14 @@ func TestReferenceRouting(t *testing.T) {
t,
conn,
`SELECT t.id FROM (
SELECT zd.id, zd.zip_id
FROM `+shardedKeyspaceName+`.zip_detail AS zd
WHERE zd.id IN (2)
ORDER BY zd.discontinued_at
LIMIT 1
) AS t
LEFT JOIN `+shardedKeyspaceName+`.zip_detail AS t0 ON t.zip_id = t0.zip_id
ORDER BY t.id`,
SELECT zd.id, zd.zip_id
FROM `+shardedKeyspaceName+`.zip_detail AS zd
WHERE zd.id IN (2)
ORDER BY zd.discontinued_at
LIMIT 1
) AS t
LEFT JOIN `+shardedKeyspaceName+`.zip_detail AS t0 ON t.zip_id = t0.zip_id
ORDER BY t.id`,
`[[INT64(2)]]`,
)
})
Expand Down Expand Up @@ -156,3 +156,19 @@ func TestReferenceRouting(t *testing.T) {
`[[INT64(2)]]`,
)
}

// TestMultiReferenceQuery tests that a query with multiple references with unsharded keyspace and sharded keyspace works with join.
func TestMultiReferenceQuery(t *testing.T) {
utils.SkipIfBinaryIsBelowVersion(t, 21, "vtgate")
conn, closer := start(t)
defer closer()

query :=
`select 1
from delivery_failure df1
join delivery_failure df2 on df1.id = df2.id
join uks.zip_detail zd1 on df1.zip_detail_id = zd1.zip_id
join uks.zip_detail zd2 on zd1.zip_id = zd2.zip_id`

utils.Exec(t, conn, query)
}
6 changes: 6 additions & 0 deletions go/test/endtoend/vtgate/queries/reference/sschema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE IF NOT EXISTS delivery_failure (
id BIGINT NOT NULL,
zip_detail_id BIGINT NOT NULL,
reason VARCHAR(255),
PRIMARY KEY(id)
) ENGINE=InnoDB;
22 changes: 22 additions & 0 deletions go/test/endtoend/vtgate/queries/reference/svschema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"sharded": true,
"vindexes": {
"hash": {
"type": "hash"
}
},
"tables": {
"delivery_failure": {
"columnVindexes": [
{
"column": "id",
"name": "hash"
}
]
},
"zip_detail": {
"type": "reference",
"source": "uks.zip_detail"
}
}
}
17 changes: 17 additions & 0 deletions go/test/endtoend/vtgate/queries/reference/uschema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
CREATE TABLE IF NOT EXISTS zip(
id BIGINT NOT NULL AUTO_INCREMENT,
code5 INT(5) NOT NULL,
PRIMARY KEY(id)
) ENGINE=InnoDB;

INSERT INTO zip(id, code5)
VALUES (1, 47107),
(2, 82845),
(3, 11237);

CREATE TABLE IF NOT EXISTS zip_detail(
id BIGINT NOT NULL AUTO_INCREMENT,
zip_id BIGINT NOT NULL,
discontinued_at DATE,
PRIMARY KEY(id)
) ENGINE=InnoDB;
6 changes: 6 additions & 0 deletions go/test/endtoend/vtgate/queries/reference/uvschema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"tables": {
"zip": {},
"zip_detail": {}
}
}
36 changes: 34 additions & 2 deletions go/vt/vtgate/planbuilder/operators/join_merging.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import (
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
"vitess.io/vitess/go/vt/vtgate/vindexes"
)

// mergeJoinInputs checks whether two operators can be merged into a single one.
// If they can be merged, a new operator with the merged routing is returned
// If they cannot be merged, nil is returned.
func mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs ops.Operator, joinPredicates []sqlparser.Expr, m merger) (*Route, error) {
func mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs ops.Operator, joinPredicates []sqlparser.Expr, m *joinMerger) (*Route, error) {
lhsRoute, rhsRoute, routingA, routingB, a, b, sameKeyspace := prepareInputRoutes(lhs, rhs)
if lhsRoute == nil {
return nil, nil
Expand All @@ -41,6 +42,14 @@ func mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs ops.Operator, jo
case b == dual:
return m.merge(ctx, lhsRoute, rhsRoute, routingA)

// As both are reference route. We need to merge the alternates as well.
case a == anyShard && b == anyShard && sameKeyspace:
newrouting, err := mergeAnyShardRoutings(ctx, routingA.(*AnyShardRouting), routingB.(*AnyShardRouting), joinPredicates, m.innerJoin)
if err != nil {
return nil, err
}
return m.merge(ctx, lhsRoute, rhsRoute, newrouting)

// an unsharded/reference route can be merged with anything going to that keyspace
case a == anyShard && sameKeyspace:
return m.merge(ctx, lhsRoute, rhsRoute, routingB)
Expand All @@ -66,6 +75,29 @@ func mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs ops.Operator, jo
}
}

func mergeAnyShardRoutings(ctx *plancontext.PlanningContext, a, b *AnyShardRouting, joinPredicates []sqlparser.Expr, innerJoin bool) (*AnyShardRouting, error) {
alternates := make(map[*vindexes.Keyspace]*Route)
for ak, av := range a.Alternates {
for bk, bv := range b.Alternates {
// only same keyspace alternates can be merged.
if ak != bk {
continue
}
op, _, err := mergeOrJoin(ctx, av, bv, joinPredicates, innerJoin)
if err != nil {
return nil, err
}
if r, ok := op.(*Route); ok {
alternates[ak] = r
}
}
}
return &AnyShardRouting{
keyspace: a.keyspace,
Alternates: alternates,
}, nil
}

func prepareInputRoutes(lhs ops.Operator, rhs ops.Operator) (*Route, *Route, Routing, Routing, routingType, routingType, bool) {
lhsRoute, rhsRoute := operatorsToRoutes(lhs, rhs)
if lhsRoute == nil || rhsRoute == nil {
Expand Down Expand Up @@ -177,7 +209,7 @@ func getRoutingType(r Routing) routingType {
panic(fmt.Sprintf("switch should be exhaustive, got %T", r))
}

func newJoinMerge(predicates []sqlparser.Expr, innerJoin bool) merger {
func newJoinMerge(predicates []sqlparser.Expr, innerJoin bool) *joinMerger {
return &joinMerger{
predicates: predicates,
innerJoin: innerJoin,
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vtgate/planbuilder/operators/joins.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ limitations under the License.
package operators

import (
"fmt"

"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
"vitess.io/vitess/go/vt/vtgate/semantics"
Expand Down Expand Up @@ -92,7 +95,7 @@ func AddPredicate(

return join, nil
}
return nil, nil
return nil, vterrors.VT13001(fmt.Sprintf("pushed wrong predicate to the join: %s", sqlparser.String(expr)))
}

// we are looking for predicates like `tbl.col = <>` or `<> = tbl.col`,
Expand Down
25 changes: 25 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/reference_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -922,5 +922,30 @@
"user.user"
]
}
},
{
"comment": "two sharded and two unsharded reference table join - all should be merged into one route",
"query": "select 1 from user u join user_extra ue on u.id = ue.user_id join main.source_of_ref sr on sr.foo = ue.foo join main.rerouted_ref rr on rr.bar = sr.bar",
"plan": {
"QueryType": "SELECT",
"Original": "select 1 from user u join user_extra ue on u.id = ue.user_id join main.source_of_ref sr on sr.foo = ue.foo join main.rerouted_ref rr on rr.bar = sr.bar",
"Instructions": {
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select 1 from `user` as u, user_extra as ue, ref_with_source as sr, ref as rr where 1 != 1",
"Query": "select 1 from `user` as u, user_extra as ue, ref_with_source as sr, ref as rr where rr.bar = sr.bar and u.id = ue.user_id and sr.foo = ue.foo",
"Table": "`user`, ref, ref_with_source, user_extra"
},
"TablesUsed": [
"user.ref",
"user.ref_with_source",
"user.user",
"user.user_extra"
]
}
}
]
Loading