Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip - dirty implementation of findRoute for Values #17899

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 92 additions & 1 deletion go/test/endtoend/vtgate/queries/misc/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package misc
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"strings"
"testing"
Expand All @@ -34,7 +35,7 @@ import (
"vitess.io/vitess/go/test/endtoend/utils"
)

func start(t *testing.T) (utils.MySQLCompare, func()) {
func start(t testing.TB) (utils.MySQLCompare, func()) {
mcmp, err := utils.NewMySQLCompare(t, vtParams, mysqlParams)
require.NoError(t, err)

Expand All @@ -53,6 +54,96 @@ func start(t *testing.T) (utils.MySQLCompare, func()) {
}
}

func BenchmarkValuesJoin(b *testing.B) {
mcmp, closer := start(b)
defer closer()

type Rep struct {
QueriesRouted map[string]int `json:"QueriesRouted"`
}

getQueriesRouted := func(thisB *testing.B) int {
_, response, _ := clusterInstance.VtgateProcess.MakeAPICall("/debug/vars")
r := Rep{}
err := json.Unmarshal([]byte(response), &r)
require.NoError(thisB, err)

var res int
for _, c := range r.QueriesRouted {
res += c
}
return res
}

b.ReportAllocs()

lhsRowCount := 0
rhsRowCount := 0

insertLHS := func(count int) {
for ; lhsRowCount < count; lhsRowCount++ {
mcmp.Exec(fmt.Sprintf("insert into t1(id1, id2) values (%d, %d)", lhsRowCount, lhsRowCount))
}
}
insertRHS := func(count int) {
for ; rhsRowCount < count; rhsRowCount++ {
mcmp.Exec(fmt.Sprintf("insert into tbl(id, unq_col, nonunq_col) values (%d, %d, %d)", rhsRowCount, rhsRowCount, rhsRowCount))
}
}

testCases := []struct {
lhsRowCount int
rhsRowCount int
}{
{
lhsRowCount: 20,
rhsRowCount: 10,
},
{
lhsRowCount: 50,
rhsRowCount: 25,
},
{
lhsRowCount: 100,
rhsRowCount: 50,
},
{
lhsRowCount: 200,
rhsRowCount: 100,
},
{
lhsRowCount: 500,
rhsRowCount: 250,
},
{
lhsRowCount: 1000,
rhsRowCount: 500,
},
{
lhsRowCount: 2000,
rhsRowCount: 1000,
},
}

var previousQueriesRoutedSum int
for _, testCase := range testCases {
insertLHS(testCase.lhsRowCount)
insertRHS(testCase.rhsRowCount)

b.Run(fmt.Sprintf("LHS(%d) RHS(%d)", testCase.lhsRowCount, testCase.rhsRowCount), func(b *testing.B) {
for range b.N {
mcmp.Exec("select t1.id1, tbl.id from t1, tbl where t1.id2 = tbl.nonunq_col")
}
b.StopTimer()

totalQueriesRouted := getQueriesRouted(b)
queriesRouted := totalQueriesRouted - previousQueriesRoutedSum
previousQueriesRoutedSum = totalQueriesRouted
b.ReportMetric(float64(queriesRouted/b.N), "queries_routed/op")
})
}
}

func TestBitVals(t *testing.T) {
mcmp, closer := start(t)
defer closer()
Expand Down
6 changes: 6 additions & 0 deletions go/vt/sqlparser/comments.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ const (
DirectiveAllowHashJoin = "ALLOW_HASH_JOIN"
// DirectiveQueryPlanner lets the user specify per query which planner should be used
DirectiveQueryPlanner = "PLANNER"
// DirectiveAllowValuesJoin allows the planner to use VALUES JOINS when possible.
DirectiveAllowValuesJoin = "ALLOW_VALUES_JOIN"
// DirectiveVExplainRunDMLQueries tells vexplain queries/all that it is okay to also run the query.
DirectiveVExplainRunDMLQueries = "EXECUTE_DML_QUERIES"
// DirectiveConsolidator enables the query consolidator.
Expand Down Expand Up @@ -554,6 +556,10 @@ func AllowScatterDirective(stmt Statement) bool {
return checkDirective(stmt, DirectiveAllowScatter)
}

func AllowValuesJoinDirective(stmt Statement) bool {
return checkDirective(stmt, DirectiveAllowValuesJoin)
}

func checkDirective(stmt Statement, key string) bool {
cmt, ok := stmt.(Commented)
if ok {
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/engine/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

101 changes: 0 additions & 101 deletions go/vt/vtgate/engine/join_values_test.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package engine

import (
"context"
"fmt"

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
Expand All @@ -34,10 +35,24 @@ type ValuesJoin struct {
// of the Join. They can be any primitive.
Left, Right Primitive

Vars []int
RowConstructorArg string
Cols []int
ColNames []string
// The name for the bind var containing the tuple-of-tuples being sent to the RHS
BindVarName string

// LHSRowID is the offset of the row ID in the LHS, used to use columns from the LHS in the output
// If LHSRowID is false, the output will be the same as the RHS, so the following fields are ignored - Cols, ColNames.
// We copy everything from the LHS to the RHS in this case, and column names are taken from the RHS.
RowID bool

// CopyColumnsToRHS are the offsets of columns from LHS we are copying over to the RHS
// []int{0,2} means that the first column in the t-o-t is the first offset from the left and the second column is the third offset
CopyColumnsToRHS []int

// Cols tells use which side the output columns come from:
// negative numbers are offsets to the left, and positive to the right
Cols []int

// ColNames are the output column names
ColNames []string
}

// TryExecute performs a non-streaming exec.
Expand All @@ -47,7 +62,7 @@ func (jv *ValuesJoin) TryExecute(ctx context.Context, vcursor VCursor, bindVars
return nil, err
}
bv := &querypb.BindVariable{
Type: querypb.Type_TUPLE,
Type: querypb.Type_ROW_TUPLE,
}
if len(lresult.Rows) == 0 && wantfields {
// If there are no rows, we still need to construct a single row
Expand All @@ -60,27 +75,43 @@ func (jv *ValuesJoin) TryExecute(ctx context.Context, vcursor VCursor, bindVars
}
bv.Values = append(bv.Values, sqltypes.TupleToProto(vals))

bindVars[jv.RowConstructorArg] = bv
bindVars[jv.BindVarName] = bv
if jv.RowID {
panic("implement me")
}
return jv.Right.GetFields(ctx, vcursor, bindVars)
}

rowSize := len(jv.CopyColumnsToRHS)
if jv.RowID {
rowSize++ // +1 since we add the row ID
}
for i, row := range lresult.Rows {
newRow := make(sqltypes.Row, 0, len(jv.Vars)+1) // +1 since we always add the row ID
newRow = append(newRow, sqltypes.NewInt64(int64(i))) // Adding the LHS row ID

for _, loffset := range jv.Vars {
newRow = append(newRow, row[loffset])
newRow := make(sqltypes.Row, 0, rowSize)

if jv.RowID {
for _, loffset := range jv.CopyColumnsToRHS {
newRow = append(newRow, row[loffset])
}
newRow = append(newRow, sqltypes.NewInt64(int64(i))) // Adding the LHS row ID
} else {
newRow = row
}

bv.Values = append(bv.Values, sqltypes.TupleToProto(newRow))
}

bindVars[jv.RowConstructorArg] = bv
bindVars[jv.BindVarName] = bv
rresult, err := vcursor.ExecutePrimitive(ctx, jv.Right, bindVars, wantfields)
if err != nil {
return nil, err
}

if !jv.RowID {
// if we are not using the row ID, we can just return the result from the RHS
return rresult, nil
}

result := &sqltypes.Result{}

result.Fields = joinFields(lresult.Fields, rresult.Fields, jv.Cols)
Expand Down Expand Up @@ -143,8 +174,9 @@ func (jv *ValuesJoin) description() PrimitiveDescription {
OperatorType: "Join",
Variant: "Values",
Other: map[string]any{
"ValuesArg": jv.RowConstructorArg,
"Vars": jv.Vars,
"BindVarName": jv.BindVarName,
"CopyColumnsToRHS": jv.CopyColumnsToRHS,
"RowID": fmt.Sprintf("%t", jv.RowID),
},
}
}
Loading
Loading