Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Enforce consistent order for table copies and diffs #15152

Merged
merged 11 commits into from
Feb 9, 2024
60 changes: 60 additions & 0 deletions go/protoutil/binlogsource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
Copyright 2024 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package protoutil

import (
"slices"
"sort"
"strings"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

// SortBinlogSourceTables sorts the table related contents of the
// BinlogSource struct lexicographically by table name in order to
// produce consistent results.
func SortBinlogSourceTables(bls *binlogdatapb.BinlogSource) {
if bls == nil {
return
}

// Sort the tables by name to ensure a consistent order.
slices.Sort(bls.Tables)

if bls.Filter == nil || len(bls.Filter.Rules) == 0 {
return
}
sort.Slice(bls.Filter.Rules, func(i, j int) bool {
// Exclude filters should logically be processed first.
if bls.Filter.Rules[i].Filter == "exclude" && bls.Filter.Rules[j].Filter != "exclude" {
return true
}
if bls.Filter.Rules[j].Filter == "exclude" && bls.Filter.Rules[i].Filter != "exclude" {
return false
}

// Remove preceding slash from the match string.
// That is used when the filter is a regular expression.
fi, _ := strings.CutPrefix(bls.Filter.Rules[i].Match, "/")
fj, _ := strings.CutPrefix(bls.Filter.Rules[j].Match, "/")
if fi != fj {
return fi < fj
}

return bls.Filter.Rules[i].Filter < bls.Filter.Rules[j].Filter
})
}
209 changes: 209 additions & 0 deletions go/protoutil/binlogsource_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
Copyright 2024 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package protoutil

import (
"testing"

"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

func TestSortBinlogSourceTables(t *testing.T) {
tests := []struct {
name string
inSource *binlogdatapb.BinlogSource
outSource *binlogdatapb.BinlogSource
}{
{
name: "Basic",
inSource: &binlogdatapb.BinlogSource{
Tables: []string{"wuts1", "atable", "1table", "ztable2", "table3"},
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
{
Match: "ztable2",
},
{
Match: "table3",
},
{
Match: "/wuts",
},
{
Match: "1table",
Filter: "a",
},
{
Match: "1table",
},
{
Match: "atable",
},
},
},
},
outSource: &binlogdatapb.BinlogSource{
Tables: []string{"1table", "atable", "table3", "wuts1", "ztable2"},
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
{
Match: "1table",
},
{
Match: "1table",
Filter: "a",
},
{
Match: "atable",
},
{
Match: "table3",
},
{
Match: "/wuts",
},
{
Match: "ztable2",
},
},
},
},
},
{
name: "With excludes",
inSource: &binlogdatapb.BinlogSource{
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
{
Match: "./*",
},
{
Match: "no4",
Filter: "exclude",
},
{
Match: "no2",
Filter: "exclude",
},
{
Match: "ztable2",
},
{
Match: "atable2",
},
},
},
},
outSource: &binlogdatapb.BinlogSource{
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
{
Match: "no2",
Filter: "exclude",
},
{
Match: "no4",
Filter: "exclude",
},
{
Match: "./*",
},
{
Match: "atable2",
},
{
Match: "ztable2",
},
},
},
},
},
{
name: "With excludes",
inSource: &binlogdatapb.BinlogSource{
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
{
Match: "no4",
Filter: "exclude",
},
{
Match: "no2",
Filter: "exclude",
},
{
Match: "./*",
},
},
},
},
outSource: &binlogdatapb.BinlogSource{
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
{
Match: "no2",
Filter: "exclude",
},
{
Match: "no4",
Filter: "exclude",
},
{
Match: "./*",
},
},
},
},
},
{
name: "Nil",
inSource: nil,
outSource: nil,
},
{
name: "No filter",
inSource: &binlogdatapb.BinlogSource{
Tables: []string{"wuts1", "atable", "1table", "ztable2", "table3"},
Filter: nil,
},
outSource: &binlogdatapb.BinlogSource{
Tables: []string{"1table", "atable", "table3", "wuts1", "ztable2"},
Filter: nil,
},
},
{
name: "No filter rules",
inSource: &binlogdatapb.BinlogSource{
Tables: []string{"wuts1", "atable", "1table", "ztable2", "table3"},
Filter: &binlogdatapb.Filter{},
},
outSource: &binlogdatapb.BinlogSource{
Tables: []string{"1table", "atable", "table3", "wuts1", "ztable2"},
Filter: &binlogdatapb.Filter{},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
SortBinlogSourceTables(tt.inSource)
require.True(t, proto.Equal(tt.inSource, tt.outSource), "got: %s, want: %s", tt.inSource.String(), tt.outSource.String())
})
}
}
5 changes: 3 additions & 2 deletions go/test/endtoend/vreplication/vdiff_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,14 @@ func doVtctlclientVDiff(t *testing.T, keyspace, workflow, cells string, want *ex

func waitForVDiff2ToComplete(t *testing.T, useVtctlclient bool, ksWorkflow, cells, uuid string, completedAtMin time.Time) *vdiffInfo {
var info *vdiffInfo
var jsonStr string
first := true
previousProgress := vdiff2.ProgressReport{}
ch := make(chan bool)
go func() {
for {
time.Sleep(vdiffStatusCheckInterval)
_, jsonStr := performVDiff2Action(t, useVtctlclient, ksWorkflow, cells, "show", uuid, false)
_, jsonStr = performVDiff2Action(t, useVtctlclient, ksWorkflow, cells, "show", uuid, false)
info = getVDiffInfo(jsonStr)
require.NotNil(t, info)
if info.State == "completed" {
Expand Down Expand Up @@ -142,7 +143,7 @@ func waitForVDiff2ToComplete(t *testing.T, useVtctlclient bool, ksWorkflow, cell
case <-ch:
return info
case <-time.After(vdiffTimeout):
log.Errorf("VDiff never completed for UUID %s", uuid)
log.Errorf("VDiff never completed for UUID %s. Latest output: %s", uuid, jsonStr)
require.FailNow(t, fmt.Sprintf("VDiff never completed for UUID %s", uuid))
return nil
}
Expand Down
9 changes: 5 additions & 4 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,13 @@ import (
"time"

"github.com/spf13/pflag"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"

"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/history"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -606,6 +605,7 @@ func ReadVRSettings(dbClient DBClient, uid int32) (VRSettings, error) {
// the _vt.vreplication table.
func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, position string, maxTPS, maxReplicationLag, timeUpdated int64, dbName string,
workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType, deferSecondaryKeys bool) string {
protoutil.SortBinlogSourceTables(source)
return fmt.Sprintf("insert into _vt.vreplication "+
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, defer_secondary_keys) "+
"values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %d, %d, %v)",
Expand All @@ -616,6 +616,7 @@ func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, posi
// CreateVReplicationState returns a statement to create a stopped vreplication.
func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource, position string, state binlogdatapb.VReplicationWorkflowState, dbName string,
workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType) string {
protoutil.SortBinlogSourceTables(source)
return fmt.Sprintf("insert into _vt.vreplication "+
"(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type) "+
"values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %d, %d)",
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vtctl/workflow/resharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"context"
"errors"
"fmt"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -281,8 +282,8 @@

ig := vreplication.NewInsertGenerator(binlogdatapb.VReplicationWorkflowState_Stopped, targetPrimary.DbName())

// copy excludeRules to prevent data race.
copyExcludeRules := append([]*binlogdatapb.Rule(nil), excludeRules...)
// Clone excludeRules to prevent data races.
copyExcludeRules := slices.Clone(excludeRules)

Check warning on line 286 in go/vt/vtctl/workflow/resharder.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/resharder.go#L286

Added line #L286 was not covered by tests
for _, source := range rs.sourceShards {
if !key.KeyRangeIntersect(target.KeyRange, source.KeyRange) {
continue
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletmanager/rpc_vreplication.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"google.golang.org/protobuf/encoding/prototext"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/discovery"
Expand Down Expand Up @@ -57,6 +58,7 @@ func (tm *TabletManager) CreateVReplicationWorkflow(ctx context.Context, req *ta
}
res := &sqltypes.Result{}
for _, bls := range req.BinlogSource {
protoutil.SortBinlogSourceTables(bls)
source, err := prototext.Marshal(bls)
if err != nil {
return nil, err
Expand Down
Loading
Loading