Skip to content

Commit

Permalink
Improve sorting and tests
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Feb 7, 2024
1 parent 024fd02 commit 7709300
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 5 deletions.
16 changes: 15 additions & 1 deletion go/protoutil/binlogsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,30 @@ func SortBinlogSourceTables(bls *binlogdatapb.BinlogSource) {
if bls == nil {
return
}

// Sort the tables by name to ensure a consistent order.
slices.Sort(bls.Tables)

if bls.Filter == nil || len(bls.Filter.Rules) == 0 {
return
}
sort.Slice(bls.Filter.Rules, func(i, j int) bool {
// Exclude filters should logically be processed first.
if bls.Filter.Rules[i].Filter == "exclude" && bls.Filter.Rules[j].Filter != "exclude" {
return true
}
if bls.Filter.Rules[j].Filter == "exclude" && bls.Filter.Rules[i].Filter != "exclude" {
return false
}

// Remove preceding slash from the match string.
// That is used when the filter is a regular expression.
fi, _ := strings.CutPrefix(bls.Filter.Rules[i].Match, "/")
fj, _ := strings.CutPrefix(bls.Filter.Rules[j].Match, "/")
return fi < fj
if fi != fj {
return fi < fj
}

return bls.Filter.Rules[i].Filter < bls.Filter.Rules[j].Filter
})
}
96 changes: 95 additions & 1 deletion go/protoutil/binlogsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ func TestSortBinlogSourceTables(t *testing.T) {
{
Match: "/wuts",
},
{
Match: "1table",
Filter: "a",
},
{
Match: "1table",
},
Expand All @@ -62,6 +66,10 @@ func TestSortBinlogSourceTables(t *testing.T) {
{
Match: "1table",
},
{
Match: "1table",
Filter: "a",
},
{
Match: "atable",
},
Expand All @@ -78,6 +86,92 @@ func TestSortBinlogSourceTables(t *testing.T) {
},
},
},
{
name: "With excludes",
inSource: &binlogdatapb.BinlogSource{
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
{
Match: "./*",
},
{
Match: "no4",
Filter: "exclude",
},
{
Match: "no2",
Filter: "exclude",
},
{
Match: "ztable2",
},
{
Match: "atable2",
},
},
},
},
outSource: &binlogdatapb.BinlogSource{
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
{
Match: "no2",
Filter: "exclude",
},
{
Match: "no4",
Filter: "exclude",
},
{
Match: "./*",
},
{
Match: "atable2",
},
{
Match: "ztable2",
},
},
},
},
},
{
name: "With excludes",
inSource: &binlogdatapb.BinlogSource{
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
{
Match: "no4",
Filter: "exclude",
},
{
Match: "no2",
Filter: "exclude",
},
{
Match: "./*",
},
},
},
},
outSource: &binlogdatapb.BinlogSource{
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
{
Match: "no2",
Filter: "exclude",
},
{
Match: "no4",
Filter: "exclude",
},
{
Match: "./*",
},
},
},
},
},
{
name: "Nil",
inSource: nil,
Expand Down Expand Up @@ -109,7 +203,7 @@ func TestSortBinlogSourceTables(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
SortBinlogSourceTables(tt.inSource)
require.True(t, proto.Equal(tt.inSource, tt.outSource))
require.True(t, proto.Equal(tt.inSource, tt.outSource), "got: %s, want: %s", tt.inSource.String(), tt.outSource.String())
})
}
}
9 changes: 7 additions & 2 deletions go/vt/vtctl/workflow/resharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
"slices"
"sync"
"time"

Expand All @@ -29,6 +30,7 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topotools"
Expand Down Expand Up @@ -281,8 +283,8 @@ func (rs *resharder) createStreams(ctx context.Context) error {

ig := vreplication.NewInsertGenerator(binlogdatapb.VReplicationWorkflowState_Stopped, targetPrimary.DbName())

// copy excludeRules to prevent data race.
copyExcludeRules := append([]*binlogdatapb.Rule(nil), excludeRules...)
// Clone excludeRules to prevent data races.
copyExcludeRules := slices.Clone(excludeRules)

Check warning on line 287 in go/vt/vtctl/workflow/resharder.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/resharder.go#L287

Added line #L287 was not covered by tests
for _, source := range rs.sourceShards {
if !key.KeyRangeIntersect(target.KeyRange, source.KeyRange) {
continue
Expand All @@ -307,13 +309,16 @@ func (rs *resharder) createStreams(ctx context.Context) error {
}

for _, rstream := range rs.refStreams {
log.Errorf("DEBUG: Before AddRow: %s", rstream.bls.String())

Check warning on line 312 in go/vt/vtctl/workflow/resharder.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/resharder.go#L312

Added line #L312 was not covered by tests
ig.AddRow(rstream.workflow, rstream.bls, "", rstream.cell, rstream.tabletTypes,
// TODO: fix based on original stream.
binlogdatapb.VReplicationWorkflowType_Reshard,
binlogdatapb.VReplicationWorkflowSubType_None,
rs.deferSecondaryKeys)
log.Errorf("DEBUG: After AddRow: %s", rstream.bls.String())

Check warning on line 318 in go/vt/vtctl/workflow/resharder.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/resharder.go#L318

Added line #L318 was not covered by tests
}
query := ig.String()
log.Errorf("DEBUG: Full statement: %s", query)

Check warning on line 321 in go/vt/vtctl/workflow/resharder.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vtctl/workflow/resharder.go#L321

Added line #L321 was not covered by tests
if _, err := rs.s.tmc.VReplicationExec(ctx, targetPrimary.Tablet, query); err != nil {
return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", targetPrimary.Tablet, query)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import (
"time"

"vitess.io/vitess/go/protoutil"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/throttler"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)

// InsertGenerator generates a vreplication insert statement.
Expand Down

0 comments on commit 7709300

Please sign in to comment.