Skip to content

Commit

Permalink
Vindexes: Pass context in consistent lookup handleDup (#14653)
Browse files Browse the repository at this point in the history
Signed-off-by: Brendan Dougherty <brendan.dougherty@shopify.com>
  • Loading branch information
brendar authored Jan 8, 2024
1 parent ab76091 commit 6f23f60
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 26 deletions.
3 changes: 1 addition & 2 deletions go/vt/vtgate/vindexes/consistent_lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,7 @@ func (lu *clCommon) handleDup(ctx context.Context, vcursor VCursor, values []sql
return err
}
// Lock the target row using normal transaction priority.
// TODO: context needs to be passed on.
qr, err = vcursor.ExecuteKeyspaceID(context.Background(), lu.keyspace, existingksid, lu.lockOwnerQuery, bindVars, false /* rollbackOnError */, false /* autocommit */)
qr, err = vcursor.ExecuteKeyspaceID(ctx, lu.keyspace, existingksid, lu.lockOwnerQuery, bindVars, false /* rollbackOnError */, false /* autocommit */)
if err != nil {
return err
}
Expand Down
90 changes: 66 additions & 24 deletions go/vt/vtgate/vindexes/consistent_lookup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,9 @@ func TestConsistentLookupMap(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(makeTestResultLookup([]int{2, 2}), nil)
ctx := newTestContext()

got, err := lookup.Map(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)})
got, err := lookup.Map(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)})
require.NoError(t, err)
want := []key.Destination{
key.DestinationKeyspaceIDs([][]byte{
Expand All @@ -135,10 +136,11 @@ func TestConsistentLookupMap(t *testing.T) {
vc.verifyLog(t, []string{
"ExecutePre select fromc1, toc from t where fromc1 in ::fromc1 [{fromc1 }] false",
})
vc.verifyContext(t, ctx)

// Test query fail.
vc.AddResult(nil, fmt.Errorf("execute failed"))
_, err = lookup.Map(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1)})
_, err = lookup.Map(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1)})
wantErr := "lookup.Map: execute failed"
if err == nil || err.Error() != wantErr {
t.Errorf("lookup(query fail) err: %v, want %s", err, wantErr)
Expand Down Expand Up @@ -167,8 +169,9 @@ func TestConsistentLookupUniqueMap(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup_unique", false)
vc := &loggingVCursor{}
vc.AddResult(makeTestResultLookup([]int{0, 1}), nil)
ctx := newTestContext()

got, err := lookup.Map(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)})
got, err := lookup.Map(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)})
require.NoError(t, err)
want := []key.Destination{
key.DestinationNone{},
Expand All @@ -180,10 +183,11 @@ func TestConsistentLookupUniqueMap(t *testing.T) {
vc.verifyLog(t, []string{
"ExecutePre select fromc1, toc from t where fromc1 in ::fromc1 [{fromc1 }] false",
})
vc.verifyContext(t, ctx)

// More than one result is invalid
vc.AddResult(makeTestResultLookup([]int{2}), nil)
_, err = lookup.Map(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1)})
_, err = lookup.Map(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1)})
wanterr := "Lookup.Map: unexpected multiple results from vindex t: INT64(1)"
if err == nil || err.Error() != wanterr {
t.Errorf("lookup(query fail) err: %v, want %s", err, wanterr)
Expand Down Expand Up @@ -212,8 +216,9 @@ func TestConsistentLookupMapAbsent(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(makeTestResultLookup([]int{0, 0}), nil)
ctx := newTestContext()

got, err := lookup.Map(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)})
got, err := lookup.Map(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)})
require.NoError(t, err)
want := []key.Destination{
key.DestinationNone{},
Expand All @@ -225,32 +230,35 @@ func TestConsistentLookupMapAbsent(t *testing.T) {
vc.verifyLog(t, []string{
"ExecutePre select fromc1, toc from t where fromc1 in ::fromc1 [{fromc1 }] false",
})
vc.verifyContext(t, ctx)
}

func TestConsistentLookupVerify(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(makeTestResult(1), nil)
vc.AddResult(makeTestResult(1), nil)
ctx := newTestContext()

_, err := lookup.Verify(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, [][]byte{[]byte("test1"), []byte("test2")})
_, err := lookup.Verify(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, [][]byte{[]byte("test1"), []byte("test2")})
require.NoError(t, err)
vc.verifyLog(t, []string{
"ExecutePre select fromc1 from t where fromc1 = :fromc1 and toc = :toc [{fromc1 1} {toc test1}] false",
"ExecutePre select fromc1 from t where fromc1 = :fromc1 and toc = :toc [{fromc1 2} {toc test2}] false",
})
vc.verifyContext(t, ctx)

// Test query fail.
vc.AddResult(nil, fmt.Errorf("execute failed"))
_, err = lookup.Verify(context.Background(), vc, []sqltypes.Value{sqltypes.NewInt64(1)}, [][]byte{[]byte("\x16k@\xb4J\xbaK\xd6")})
_, err = lookup.Verify(ctx, vc, []sqltypes.Value{sqltypes.NewInt64(1)}, [][]byte{[]byte("\x16k@\xb4J\xbaK\xd6")})
want := "lookup.Verify: execute failed"
if err == nil || err.Error() != want {
t.Errorf("lookup(query fail) err: %v, want %s", err, want)
}

// Test write_only.
lookup = createConsistentLookup(t, "consistent_lookup", true)
got, err := lookup.Verify(context.Background(), nil, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, [][]byte{[]byte(""), []byte("")})
got, err := lookup.Verify(ctx, nil, []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}, [][]byte{[]byte(""), []byte("")})
require.NoError(t, err)
wantBools := []bool{true, true}
if !reflect.DeepEqual(got, wantBools) {
Expand All @@ -262,8 +270,9 @@ func TestConsistentLookupCreateSimple(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(&sqltypes.Result{}, nil)
ctx := newTestContext()

if err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{
if err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{
sqltypes.NewInt64(1),
sqltypes.NewInt64(2),
}, {
Expand All @@ -275,6 +284,7 @@ func TestConsistentLookupCreateSimple(t *testing.T) {
vc.verifyLog(t, []string{
"ExecutePre insert into t(fromc1, fromc2, toc) values(:fromc1_0, :fromc2_0, :toc_0), (:fromc1_1, :fromc2_1, :toc_1) [{fromc1_0 1} {fromc1_1 3} {fromc2_0 2} {fromc2_1 4} {toc_0 test1} {toc_1 test2}] true",
})
vc.verifyContext(t, ctx)
}

func TestConsistentLookupCreateThenRecreate(t *testing.T) {
Expand All @@ -283,8 +293,9 @@ func TestConsistentLookupCreateThenRecreate(t *testing.T) {
vc.AddResult(nil, sqlerror.NewSQLError(sqlerror.ERDupEntry, sqlerror.SSConstraintViolation, "Duplicate entry"))
vc.AddResult(&sqltypes.Result{}, nil)
vc.AddResult(&sqltypes.Result{}, nil)
ctx := newTestContext()

if err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{
if err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{
sqltypes.NewInt64(1),
sqltypes.NewInt64(2),
}}, [][]byte{[]byte("test1")}, false); err != nil {
Expand All @@ -295,6 +306,7 @@ func TestConsistentLookupCreateThenRecreate(t *testing.T) {
"ExecutePre select toc from t where fromc1 = :fromc1 and fromc2 = :fromc2 for update [{fromc1 1} {fromc2 2} {toc test1}] false",
"ExecutePre insert into t(fromc1, fromc2, toc) values(:fromc1, :fromc2, :toc) [{fromc1 1} {fromc2 2} {toc test1}] true",
})
vc.verifyContext(t, ctx)
}

func TestConsistentLookupCreateThenUpdate(t *testing.T) {
Expand All @@ -304,8 +316,9 @@ func TestConsistentLookupCreateThenUpdate(t *testing.T) {
vc.AddResult(makeTestResult(1), nil)
vc.AddResult(&sqltypes.Result{}, nil)
vc.AddResult(&sqltypes.Result{}, nil)
ctx := newTestContext()

if err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{
if err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{
sqltypes.NewInt64(1),
sqltypes.NewInt64(2),
}}, [][]byte{[]byte("test1")}, false); err != nil {
Expand All @@ -317,6 +330,7 @@ func TestConsistentLookupCreateThenUpdate(t *testing.T) {
"ExecuteKeyspaceID select fc1 from `dot.t1` where fc1 = :fromc1 and fc2 = :fromc2 lock in share mode [{fromc1 1} {fromc2 2} {toc test1}] false",
"ExecutePre update t set toc=:toc where fromc1 = :fromc1 and fromc2 = :fromc2 [{fromc1 1} {fromc2 2} {toc test1}] true",
})
vc.verifyContext(t, ctx)
}

func TestConsistentLookupCreateThenSkipUpdate(t *testing.T) {
Expand All @@ -326,8 +340,9 @@ func TestConsistentLookupCreateThenSkipUpdate(t *testing.T) {
vc.AddResult(makeTestResult(1), nil)
vc.AddResult(&sqltypes.Result{}, nil)
vc.AddResult(&sqltypes.Result{}, nil)
ctx := newTestContext()

if err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{
if err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{
sqltypes.NewInt64(1),
sqltypes.NewInt64(2),
}}, [][]byte{[]byte("1")}, false); err != nil {
Expand All @@ -338,6 +353,7 @@ func TestConsistentLookupCreateThenSkipUpdate(t *testing.T) {
"ExecutePre select toc from t where fromc1 = :fromc1 and fromc2 = :fromc2 for update [{fromc1 1} {fromc2 2} {toc 1}] false",
"ExecuteKeyspaceID select fc1 from `dot.t1` where fc1 = :fromc1 and fc2 = :fromc2 lock in share mode [{fromc1 1} {fromc2 2} {toc 1}] false",
})
vc.verifyContext(t, ctx)
}

func TestConsistentLookupCreateThenDupkey(t *testing.T) {
Expand All @@ -347,8 +363,9 @@ func TestConsistentLookupCreateThenDupkey(t *testing.T) {
vc.AddResult(makeTestResult(1), nil)
vc.AddResult(makeTestResult(1), nil)
vc.AddResult(&sqltypes.Result{}, nil)
ctx := newTestContext()

err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{
err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{
sqltypes.NewInt64(1),
sqltypes.NewInt64(2),
}}, [][]byte{[]byte("test1")}, false)
Expand All @@ -359,14 +376,16 @@ func TestConsistentLookupCreateThenDupkey(t *testing.T) {
"ExecutePre select toc from t where fromc1 = :fromc1 and fromc2 = :fromc2 for update [{fromc1 1} {fromc2 2} {toc test1}] false",
"ExecuteKeyspaceID select fc1 from `dot.t1` where fc1 = :fromc1 and fc2 = :fromc2 lock in share mode [{fromc1 1} {fromc2 2} {toc test1}] false",
})
vc.verifyContext(t, ctx)
}

func TestConsistentLookupCreateNonDupError(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(nil, errors.New("general error"))
ctx := newTestContext()

err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{
err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{
sqltypes.NewInt64(1),
sqltypes.NewInt64(2),
}}, [][]byte{[]byte("test1")}, false)
Expand All @@ -377,15 +396,17 @@ func TestConsistentLookupCreateNonDupError(t *testing.T) {
vc.verifyLog(t, []string{
"ExecutePre insert into t(fromc1, fromc2, toc) values(:fromc1_0, :fromc2_0, :toc_0) [{fromc1_0 1} {fromc2_0 2} {toc_0 test1}] true",
})
vc.verifyContext(t, ctx)
}

func TestConsistentLookupCreateThenBadRows(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(nil, vterrors.New(vtrpcpb.Code_ALREADY_EXISTS, "(errno 1062) (sqlstate 23000) Duplicate entry"))
vc.AddResult(makeTestResult(2), nil)
ctx := newTestContext()

err := lookup.(Lookup).Create(context.Background(), vc, [][]sqltypes.Value{{
err := lookup.(Lookup).Create(ctx, vc, [][]sqltypes.Value{{
sqltypes.NewInt64(1),
sqltypes.NewInt64(2),
}}, [][]byte{[]byte("test1")}, false)
Expand All @@ -397,14 +418,16 @@ func TestConsistentLookupCreateThenBadRows(t *testing.T) {
"ExecutePre insert into t(fromc1, fromc2, toc) values(:fromc1_0, :fromc2_0, :toc_0) [{fromc1_0 1} {fromc2_0 2} {toc_0 test1}] true",
"ExecutePre select toc from t where fromc1 = :fromc1 and fromc2 = :fromc2 for update [{fromc1 1} {fromc2 2} {toc test1}] false",
})
vc.verifyContext(t, ctx)
}

func TestConsistentLookupDelete(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(&sqltypes.Result{}, nil)
ctx := newTestContext()

if err := lookup.(Lookup).Delete(context.Background(), vc, [][]sqltypes.Value{{
if err := lookup.(Lookup).Delete(ctx, vc, [][]sqltypes.Value{{
sqltypes.NewInt64(1),
sqltypes.NewInt64(2),
}}, []byte("test")); err != nil {
Expand All @@ -413,15 +436,17 @@ func TestConsistentLookupDelete(t *testing.T) {
vc.verifyLog(t, []string{
"ExecutePost delete from t where fromc1 = :fromc1 and fromc2 = :fromc2 and toc = :toc [{fromc1 1} {fromc2 2} {toc test}] true",
})
vc.verifyContext(t, ctx)
}

func TestConsistentLookupUpdate(t *testing.T) {
lookup := createConsistentLookup(t, "consistent_lookup", false)
vc := &loggingVCursor{}
vc.AddResult(&sqltypes.Result{}, nil)
vc.AddResult(&sqltypes.Result{}, nil)
ctx := newTestContext()

if err := lookup.(Lookup).Update(context.Background(), vc, []sqltypes.Value{
if err := lookup.(Lookup).Update(ctx, vc, []sqltypes.Value{
sqltypes.NewInt64(1),
sqltypes.NewInt64(2),
}, []byte("test"), []sqltypes.Value{
Expand All @@ -434,6 +459,7 @@ func TestConsistentLookupUpdate(t *testing.T) {
"ExecutePost delete from t where fromc1 = :fromc1 and fromc2 = :fromc2 and toc = :toc [{fromc1 1} {fromc2 2} {toc test}] true",
"ExecutePre insert into t(fromc1, fromc2, toc) values(:fromc1_0, :fromc2_0, :toc_0) [{fromc1_0 3} {fromc2_0 4} {toc_0 test}] true",
})
vc.verifyContext(t, ctx)
}

func TestConsistentLookupNoUpdate(t *testing.T) {
Expand Down Expand Up @@ -510,13 +536,19 @@ func createConsistentLookup(t *testing.T, name string, writeOnly bool) SingleCol
return l.(SingleColumn)
}

func newTestContext() context.Context {
type testContextKey string // keep static checks from complaining about built-in types as context keys
return context.WithValue(context.Background(), (testContextKey)("test"), "foo")
}

var _ VCursor = (*loggingVCursor)(nil)

type loggingVCursor struct {
results []*sqltypes.Result
errors []error
index int
log []string
results []*sqltypes.Result
errors []error
index int
log []string
contexts []context.Context
}

func (vc *loggingVCursor) LookupRowLockShardSession() vtgatepb.CommitOrder {
Expand Down Expand Up @@ -557,14 +589,14 @@ func (vc *loggingVCursor) Execute(ctx context.Context, method string, query stri
case vtgatepb.CommitOrder_AUTOCOMMIT:
name = "ExecuteAutocommit"
}
return vc.execute(name, query, bindvars, rollbackOnError)
return vc.execute(ctx, name, query, bindvars, rollbackOnError)
}

func (vc *loggingVCursor) ExecuteKeyspaceID(ctx context.Context, keyspace string, ksid []byte, query string, bindVars map[string]*querypb.BindVariable, rollbackOnError, autocommit bool) (*sqltypes.Result, error) {
return vc.execute("ExecuteKeyspaceID", query, bindVars, rollbackOnError)
return vc.execute(ctx, "ExecuteKeyspaceID", query, bindVars, rollbackOnError)
}

func (vc *loggingVCursor) execute(method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool) (*sqltypes.Result, error) {
func (vc *loggingVCursor) execute(ctx context.Context, method string, query string, bindvars map[string]*querypb.BindVariable, rollbackOnError bool) (*sqltypes.Result, error) {
if vc.index >= len(vc.results) {
return nil, fmt.Errorf("ran out of results to return: %s", query)
}
Expand All @@ -574,6 +606,7 @@ func (vc *loggingVCursor) execute(method string, query string, bindvars map[stri
}
sort.Slice(bvl, func(i, j int) bool { return bvl[i].Name < bvl[j].Name })
vc.log = append(vc.log, fmt.Sprintf("%s %s %v %v", method, query, bvl, rollbackOnError))
vc.contexts = append(vc.contexts, ctx)
idx := vc.index
vc.index++
if vc.errors[idx] != nil {
Expand All @@ -597,6 +630,15 @@ func (vc *loggingVCursor) verifyLog(t *testing.T, want []string) {
}
}

func (vc *loggingVCursor) verifyContext(t *testing.T, want context.Context) {
t.Helper()
for i, got := range vc.contexts {
if got != want {
t.Errorf("context(%d):\ngot: %v\nwant: %v", i, got, want)
}
}
}

// create lookup result with one to one mapping
func makeTestResult(numRows int) *sqltypes.Result {
result := &sqltypes.Result{
Expand Down

0 comments on commit 6f23f60

Please sign in to comment.