From 7709300b7379bbdd0c017f6d2ef42c862a72da4f Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 7 Feb 2024 15:21:26 -0500 Subject: [PATCH] Improve sorting and tests Signed-off-by: Matt Lord --- go/protoutil/binlogsource.go | 16 +++- go/protoutil/binlogsource_test.go | 96 ++++++++++++++++++- go/vt/vtctl/workflow/resharder.go | 9 +- .../vreplication/insert_generator.go | 3 +- 4 files changed, 119 insertions(+), 5 deletions(-) diff --git a/go/protoutil/binlogsource.go b/go/protoutil/binlogsource.go index f890543f485..385f472c202 100644 --- a/go/protoutil/binlogsource.go +++ b/go/protoutil/binlogsource.go @@ -31,16 +31,30 @@ func SortBinlogSourceTables(bls *binlogdatapb.BinlogSource) { if bls == nil { return } + // Sort the tables by name to ensure a consistent order. slices.Sort(bls.Tables) + if bls.Filter == nil || len(bls.Filter.Rules) == 0 { return } sort.Slice(bls.Filter.Rules, func(i, j int) bool { + // Exclude filters should logically be processed first. + if bls.Filter.Rules[i].Filter == "exclude" && bls.Filter.Rules[j].Filter != "exclude" { + return true + } + if bls.Filter.Rules[j].Filter == "exclude" && bls.Filter.Rules[i].Filter != "exclude" { + return false + } + // Remove preceding slash from the match string. // That is used when the filter is a regular expression. fi, _ := strings.CutPrefix(bls.Filter.Rules[i].Match, "/") fj, _ := strings.CutPrefix(bls.Filter.Rules[j].Match, "/") - return fi < fj + if fi != fj { + return fi < fj + } + + return bls.Filter.Rules[i].Filter < bls.Filter.Rules[j].Filter }) } diff --git a/go/protoutil/binlogsource_test.go b/go/protoutil/binlogsource_test.go index 438f6e0657d..fe5564535bd 100644 --- a/go/protoutil/binlogsource_test.go +++ b/go/protoutil/binlogsource_test.go @@ -46,6 +46,10 @@ func TestSortBinlogSourceTables(t *testing.T) { { Match: "/wuts", }, + { + Match: "1table", + Filter: "a", + }, { Match: "1table", }, @@ -62,6 +66,10 @@ func TestSortBinlogSourceTables(t *testing.T) { { Match: "1table", }, + { + Match: "1table", + Filter: "a", + }, { Match: "atable", }, @@ -78,6 +86,92 @@ func TestSortBinlogSourceTables(t *testing.T) { }, }, }, + { + name: "With excludes", + inSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: "./*", + }, + { + Match: "no4", + Filter: "exclude", + }, + { + Match: "no2", + Filter: "exclude", + }, + { + Match: "ztable2", + }, + { + Match: "atable2", + }, + }, + }, + }, + outSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: "no2", + Filter: "exclude", + }, + { + Match: "no4", + Filter: "exclude", + }, + { + Match: "./*", + }, + { + Match: "atable2", + }, + { + Match: "ztable2", + }, + }, + }, + }, + }, + { + name: "With excludes", + inSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: "no4", + Filter: "exclude", + }, + { + Match: "no2", + Filter: "exclude", + }, + { + Match: "./*", + }, + }, + }, + }, + outSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: "no2", + Filter: "exclude", + }, + { + Match: "no4", + Filter: "exclude", + }, + { + Match: "./*", + }, + }, + }, + }, + }, { name: "Nil", inSource: nil, @@ -109,7 +203,7 @@ func TestSortBinlogSourceTables(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { SortBinlogSourceTables(tt.inSource) - require.True(t, proto.Equal(tt.inSource, tt.outSource)) + require.True(t, proto.Equal(tt.inSource, tt.outSource), "got: %s, want: %s", tt.inSource.String(), tt.outSource.String()) }) } } diff --git a/go/vt/vtctl/workflow/resharder.go b/go/vt/vtctl/workflow/resharder.go index e36b546c1d2..ce0e0a13eb9 100644 --- a/go/vt/vtctl/workflow/resharder.go +++ b/go/vt/vtctl/workflow/resharder.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "slices" "sync" "time" @@ -29,6 +30,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/key" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topotools" @@ -281,8 +283,8 @@ func (rs *resharder) createStreams(ctx context.Context) error { ig := vreplication.NewInsertGenerator(binlogdatapb.VReplicationWorkflowState_Stopped, targetPrimary.DbName()) - // copy excludeRules to prevent data race. - copyExcludeRules := append([]*binlogdatapb.Rule(nil), excludeRules...) + // Clone excludeRules to prevent data races. + copyExcludeRules := slices.Clone(excludeRules) for _, source := range rs.sourceShards { if !key.KeyRangeIntersect(target.KeyRange, source.KeyRange) { continue @@ -307,13 +309,16 @@ func (rs *resharder) createStreams(ctx context.Context) error { } for _, rstream := range rs.refStreams { + log.Errorf("DEBUG: Before AddRow: %s", rstream.bls.String()) ig.AddRow(rstream.workflow, rstream.bls, "", rstream.cell, rstream.tabletTypes, // TODO: fix based on original stream. binlogdatapb.VReplicationWorkflowType_Reshard, binlogdatapb.VReplicationWorkflowSubType_None, rs.deferSecondaryKeys) + log.Errorf("DEBUG: After AddRow: %s", rstream.bls.String()) } query := ig.String() + log.Errorf("DEBUG: Full statement: %s", query) if _, err := rs.s.tmc.VReplicationExec(ctx, targetPrimary.Tablet, query); err != nil { return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", targetPrimary.Tablet, query) } diff --git a/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go b/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go index 1ab45d2414d..62af8c9396d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go @@ -22,8 +22,9 @@ import ( "time" "vitess.io/vitess/go/protoutil" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" "vitess.io/vitess/go/vt/throttler" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) // InsertGenerator generates a vreplication insert statement.