Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Planner support for VALUES-based Joins #17641

Draft
wants to merge 34 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
e3c6b57
first steps in using JoinPredicate
systay Feb 28, 2025
ea3d4e9
feat: help dual merging with the new join predicates - wip
systay Feb 28, 2025
1c1260d
feat: dont add predicates if already under the route
systay Feb 28, 2025
d5105e0
re-instate the derived table rewriting
systay Feb 28, 2025
14668f7
feat: create a new instance when cloning
systay Feb 28, 2025
13aa331
only use a predicate ID if it's tracked
systay Feb 28, 2025
4bd8084
handle CTE predicates
systay Feb 28, 2025
11ed6d3
strip out JoinPredicates to maintain precedence calculations
systay Mar 2, 2025
efea30d
skip first, strip out jp after
systay Mar 3, 2025
cb5e0bf
make sure to keep predicate IDs unique
systay Mar 3, 2025
90d6ea4
recursive cte to update join predicate with original on merge
harshit-gangal Mar 3, 2025
3deffec
feat: make sure that evalengine knows how to handle join predicates
systay Mar 3, 2025
ad3d33d
Merge remote-tracking branch 'upstream/main' into new-predicates
systay Mar 3, 2025
ebeae68
refactor: don't pass along predicates we already have
systay Mar 3, 2025
52ba3d4
feat: merge apply-joins at a later stage when possible
systay Mar 3, 2025
6dc9e3c
handle pushing of applyJoin under Route correctly
systay Mar 3, 2025
d0d5ba5
Merge remote-tracking branch 'upstream/main' into new-predicates
systay Mar 3, 2025
610d76f
refactor: clean up of code
systay Mar 3, 2025
9a7210c
more clean up
systay Mar 3, 2025
fef7ecb
refactor: more simplification
systay Mar 3, 2025
a541e12
start planning values join
frouioui Jan 22, 2025
2c56eba
wip: added routing logic to update after pushing values under the route
harshit-gangal Mar 4, 2025
8920543
feat: calculate the tuple bindvar offset
systay Mar 5, 2025
3afc391
feat: allow passing through of columns through Values op
systay Mar 5, 2025
37f8f8e
add pre-commit check for updates of onecase.json
systay Mar 5, 2025
a59fa2b
add column alias to expression pushed to right hand side of values join
harshit-gangal Mar 5, 2025
2defbed
Merge remote-tracking branch 'upstream/main' into values-join-planner
harshit-gangal Mar 5, 2025
177fb08
feat: make sure to handle Values and subqueries well
systay Mar 5, 2025
d61715f
codegen
systay Mar 5, 2025
3a782d6
Fix TestJoinValuesExecute
frouioui Mar 5, 2025
8b5c4f0
Clean up the use of the ALLOW_VALUES_JOIN query hint
frouioui Mar 5, 2025
6883f5e
wip - run values join everywhere - test E2E tests
frouioui Mar 5, 2025
f70aab2
use row tuple in TupleBindVariable
frouioui Mar 5, 2025
bb4a289
wip - try to fix TestUsingJoin
frouioui Mar 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/sqltypes/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ const (
Tuple = querypb.Type_TUPLE
BitNum = querypb.Type_BITNUM
Vector = querypb.Type_VECTOR
RowTuple = querypb.Type_ROW_TUPLE
)

// bit-shift the mysql flags by two byte so we
Expand Down
93 changes: 92 additions & 1 deletion go/test/endtoend/vtgate/queries/misc/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package misc
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"strings"
"testing"
Expand All @@ -34,7 +35,7 @@ import (
"vitess.io/vitess/go/test/endtoend/utils"
)

func start(t *testing.T) (utils.MySQLCompare, func()) {
func start(t testing.TB) (utils.MySQLCompare, func()) {
mcmp, err := utils.NewMySQLCompare(t, vtParams, mysqlParams)
require.NoError(t, err)

Expand All @@ -53,6 +54,96 @@ func start(t *testing.T) (utils.MySQLCompare, func()) {
}
}

func BenchmarkValuesJoin(b *testing.B) {
mcmp, closer := start(b)
defer closer()

type Rep struct {
QueriesRouted map[string]int `json:"QueriesRouted"`
}

getQueriesRouted := func(thisB *testing.B) int {
_, response, _ := clusterInstance.VtgateProcess.MakeAPICall("/debug/vars")
r := Rep{}
err := json.Unmarshal([]byte(response), &r)
require.NoError(thisB, err)

var res int
for _, c := range r.QueriesRouted {
res += c
}
return res
}

b.ReportAllocs()

lhsRowCount := 0
rhsRowCount := 0

insertLHS := func(count int) {
for ; lhsRowCount < count; lhsRowCount++ {
mcmp.Exec(fmt.Sprintf("insert into t1(id1, id2) values (%d, %d)", lhsRowCount, lhsRowCount))
}
}
insertRHS := func(count int) {
for ; rhsRowCount < count; rhsRowCount++ {
mcmp.Exec(fmt.Sprintf("insert into tbl(id, unq_col, nonunq_col) values (%d, %d, %d)", rhsRowCount, rhsRowCount, rhsRowCount))
}
}

testCases := []struct {
lhsRowCount int
rhsRowCount int
}{
{
lhsRowCount: 20,
rhsRowCount: 10,
},
{
lhsRowCount: 50,
rhsRowCount: 25,
},
{
lhsRowCount: 100,
rhsRowCount: 50,
},
{
lhsRowCount: 200,
rhsRowCount: 100,
},
{
lhsRowCount: 500,
rhsRowCount: 250,
},
{
lhsRowCount: 1000,
rhsRowCount: 500,
},
{
lhsRowCount: 2000,
rhsRowCount: 1000,
},
}

var previousQueriesRoutedSum int
for _, testCase := range testCases {
insertLHS(testCase.lhsRowCount)
insertRHS(testCase.rhsRowCount)

b.Run(fmt.Sprintf("LHS(%d) RHS(%d)", testCase.lhsRowCount, testCase.rhsRowCount), func(b *testing.B) {
for range b.N {
mcmp.Exec("select t1.id1, tbl.id from t1, tbl where t1.id2 = tbl.nonunq_col")
}
b.StopTimer()

totalQueriesRouted := getQueriesRouted(b)
queriesRouted := totalQueriesRouted - previousQueriesRoutedSum
previousQueriesRoutedSum = totalQueriesRouted
b.ReportMetric(float64(queriesRouted/b.N), "queries_routed/op")
})
}
}

func TestBitVals(t *testing.T) {
mcmp, closer := start(t)
defer closer()
Expand Down
2 changes: 2 additions & 0 deletions go/vt/sqlparser/comments.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const (
DirectiveAllowHashJoin = "ALLOW_HASH_JOIN"
// DirectiveQueryPlanner lets the user specify per query which planner should be used
DirectiveQueryPlanner = "PLANNER"
// DirectiveAllowValuesJoin allows the planner to use VALUES JOINS when possible.
DirectiveAllowValuesJoin = "ALLOW_VALUES_JOIN"
// DirectiveVExplainRunDMLQueries tells vexplain queries/all that it is okay to also run the query.
DirectiveVExplainRunDMLQueries = "EXECUTE_DML_QUERIES"
// DirectiveConsolidator enables the query consolidator.
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/engine/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/fake_vcursor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,7 +973,7 @@ func printBindVars(bindvars map[string]*querypb.BindVariable) string {
fmt.Fprintf(buf, " ")
}

if bindvars[k].Type == querypb.Type_TUPLE {
if bindvars[k].Type == querypb.Type_ROW_TUPLE {
fmt.Fprintf(buf, "%s: [", k)
for _, val := range bindvars[k].Values {
if val.Type != querypb.Type_TUPLE {
Expand Down
101 changes: 0 additions & 101 deletions go/vt/vtgate/engine/join_values_test.go

This file was deleted.

5 changes: 2 additions & 3 deletions go/vt/vtgate/engine/simple_projection.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package engine

import (
"context"
"fmt"
"strconv"
"strings"

Expand Down Expand Up @@ -149,9 +148,9 @@ func (sc *SimpleProjection) description() PrimitiveDescription {
}

var colNames []string
for idx, cName := range sc.ColNames {
for _, cName := range sc.ColNames {
if cName != "" {
colNames = append(colNames, fmt.Sprintf("%d:%s", idx, cName))
colNames = append(colNames, cName)
}
}
if colNames != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package engine

import (
"context"
"fmt"

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
Expand All @@ -34,10 +35,24 @@ type ValuesJoin struct {
// of the Join. They can be any primitive.
Left, Right Primitive

Vars []int
RowConstructorArg string
Cols []int
ColNames []string
// The name for the bind var containing the tuple-of-tuples being sent to the RHS
BindVarName string

// LHSRowID is the offset of the row ID in the LHS, used to use columns from the LHS in the output
// If LHSRowID is false, the output will be the same as the RHS, so the following fields are ignored - Cols, ColNames.
// We copy everything from the LHS to the RHS in this case, and column names are taken from the RHS.
RowID bool

// CopyColumnsToRHS are the offsets of columns from LHS we are copying over to the RHS
// []int{0,2} means that the first column in the t-o-t is the first offset from the left and the second column is the third offset
CopyColumnsToRHS []int

// Cols tells use which side the output columns come from:
// negative numbers are offsets to the left, and positive to the right
Cols []int

// ColNames are the output column names
ColNames []string
}

// TryExecute performs a non-streaming exec.
Expand All @@ -47,7 +62,7 @@ func (jv *ValuesJoin) TryExecute(ctx context.Context, vcursor VCursor, bindVars
return nil, err
}
bv := &querypb.BindVariable{
Type: querypb.Type_TUPLE,
Type: querypb.Type_ROW_TUPLE,
}
if len(lresult.Rows) == 0 && wantfields {
// If there are no rows, we still need to construct a single row
Expand All @@ -60,27 +75,43 @@ func (jv *ValuesJoin) TryExecute(ctx context.Context, vcursor VCursor, bindVars
}
bv.Values = append(bv.Values, sqltypes.TupleToProto(vals))

bindVars[jv.RowConstructorArg] = bv
bindVars[jv.BindVarName] = bv
if jv.RowID {
panic("implement me")
}
return jv.Right.GetFields(ctx, vcursor, bindVars)
}

rowSize := len(jv.CopyColumnsToRHS)
if jv.RowID {
rowSize++ // +1 since we add the row ID
}
for i, row := range lresult.Rows {
newRow := make(sqltypes.Row, 0, len(jv.Vars)+1) // +1 since we always add the row ID
newRow = append(newRow, sqltypes.NewInt64(int64(i))) // Adding the LHS row ID

for _, loffset := range jv.Vars {
newRow = append(newRow, row[loffset])
newRow := make(sqltypes.Row, 0, rowSize)

if jv.RowID {
for _, loffset := range jv.CopyColumnsToRHS {
newRow = append(newRow, row[loffset])
}
newRow = append(newRow, sqltypes.NewInt64(int64(i))) // Adding the LHS row ID
} else {
newRow = row
}

bv.Values = append(bv.Values, sqltypes.TupleToProto(newRow))
}

bindVars[jv.RowConstructorArg] = bv
bindVars[jv.BindVarName] = bv
rresult, err := vcursor.ExecutePrimitive(ctx, jv.Right, bindVars, wantfields)
if err != nil {
return nil, err
}

if !jv.RowID {
// if we are not using the row ID, we can just return the result from the RHS
return rresult, nil
}

result := &sqltypes.Result{}

result.Fields = joinFields(lresult.Fields, rresult.Fields, jv.Cols)
Expand Down Expand Up @@ -143,8 +174,9 @@ func (jv *ValuesJoin) description() PrimitiveDescription {
OperatorType: "Join",
Variant: "Values",
Other: map[string]any{
"ValuesArg": jv.RowConstructorArg,
"Vars": jv.Vars,
"BindVarName": jv.BindVarName,
"CopyColumnsToRHS": jv.CopyColumnsToRHS,
"RowID": fmt.Sprintf("%t", jv.RowID),
},
}
}
Loading
Loading