From 0b86874728631551fb33e8ac6026ac9c213c444a Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 6 Feb 2024 13:50:59 -0500 Subject: [PATCH 01/11] Enforce consistent order for table copies and diffs Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/rpc_vreplication.go | 5 +++++ go/vt/vttablet/tabletmanager/vdiff/schema.go | 6 +++--- .../tabletmanager/vreplication/vcopier.go | 15 ++++++++++----- 3 files changed, 18 insertions(+), 8 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication.go b/go/vt/vttablet/tabletmanager/rpc_vreplication.go index 0319146321a..cf1d3946ed5 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication.go @@ -18,6 +18,7 @@ package tabletmanager import ( "context" + "slices" "strings" "google.golang.org/protobuf/encoding/prototext" @@ -57,6 +58,8 @@ func (tm *TabletManager) CreateVReplicationWorkflow(ctx context.Context, req *ta } res := &sqltypes.Result{} for _, bls := range req.BinlogSource { + // Sort the tables by name to ensure a consistent order. + slices.Sort(bls.Tables) source, err := prototext.Marshal(bls) if err != nil { return nil, err @@ -294,6 +297,8 @@ func (tm *TabletManager) UpdateVReplicationWorkflow(ctx context.Context, req *ta if !textutil.ValueIsSimulatedNull(req.OnDdl) { bls.OnDdl = req.OnDdl } + // Sort the tables by name to ensure a consistent order. + slices.Sort(bls.Tables) source, err = prototext.Marshal(bls) if err != nil { return nil, err diff --git a/go/vt/vttablet/tabletmanager/vdiff/schema.go b/go/vt/vttablet/tabletmanager/vdiff/schema.go index 31e686877a2..afb79b4e4b3 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/schema.go +++ b/go/vt/vttablet/tabletmanager/vdiff/schema.go @@ -37,7 +37,7 @@ const ( vd.started_at as started_at, vdt.rows_compared as rows_compared, vd.completed_at as completed_at, IF(vdt.mismatch = 1, 1, 0) as has_mismatch, vdt.report as report from _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id) - where vd.id = %a` + where vd.id = %a order by table_name` // sqlUpdateVDiffState has a penultimate placeholder for any additional columns you want to update, e.g. `, foo = 1`. // It also truncates the error if needed to ensure that we can save the state when the error text is very long. sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = left(%s, 1024) %s where id = %d" @@ -49,7 +49,7 @@ const ( sqlGetVDiffID = "select id as id from _vt.vdiff where vdiff_uuid = %a" sqlGetVDiffIDsByKeyspaceWorkflow = "select id as id from _vt.vdiff where keyspace = %a and workflow = %a" sqlGetTableRows = "select table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %a and table_name = %a" - sqlGetAllTableRows = "select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %s and table_name in (%s)" + sqlGetAllTableRows = "select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = %s and table_name in (%s) order by table_name" sqlNewVDiffTable = "insert into _vt.vdiff_table(vdiff_id, table_name, state, table_rows) values(%a, %a, 'pending', %a)" sqlGetVDiffTable = `select vdt.lastpk as lastpk, vdt.mismatch as mismatch, vdt.report as report @@ -62,5 +62,5 @@ const ( sqlUpdateTableStateAndReport = "update _vt.vdiff_table set state = %a, rows_compared = %a, report = %a where vdiff_id = %a and table_name = %a" sqlUpdateTableMismatch = "update _vt.vdiff_table set mismatch = true where vdiff_id = %a and table_name = %a" - sqlGetIncompleteTables = "select table_name as table_name from _vt.vdiff_table where vdiff_id = %a and state != 'completed'" + sqlGetIncompleteTables = "select table_name as table_name from _vt.vdiff_table where vdiff_id = %a and state != 'completed' order by table_name" ) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go index 4779e607960..dfe51f71dbd 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier.go @@ -21,10 +21,12 @@ import ( "fmt" "io" "math" + "slices" "strconv" "strings" "time" + "golang.org/x/exp/maps" "google.golang.org/protobuf/encoding/prototext" "vitess.io/vitess/go/bytes2" @@ -230,9 +232,12 @@ func (vc *vcopier) initTablesForCopy(ctx context.Context) error { if len(plan.TargetTables) != 0 { var buf strings.Builder buf.WriteString("insert into _vt.copy_state(vrepl_id, table_name) values ") + // Sort the tables by name to ensure a consistent order. + tableNames := maps.Keys(plan.TargetTables) + slices.Sort(tableNames) prefix := "" - for name := range plan.TargetTables { - fmt.Fprintf(&buf, "%s(%d, %s)", prefix, vc.vr.id, encodeString(name)) + for _, tableName := range tableNames { + fmt.Fprintf(&buf, "%s(%d, %s)", prefix, vc.vr.id, encodeString(tableName)) prefix = ", " } if _, err := vc.vr.dbClient.Execute(buf.String()); err != nil { @@ -256,8 +261,8 @@ func (vc *vcopier) initTablesForCopy(ctx context.Context) error { len(plan.TargetTables))); err != nil { return err } - for name := range plan.TargetTables { - if err := vc.vr.stashSecondaryKeys(ctx, name); err != nil { + for _, tableName := range tableNames { + if err := vc.vr.stashSecondaryKeys(ctx, tableName); err != nil { return err } } @@ -294,7 +299,7 @@ func (vc *vcopier) initTablesForCopy(ctx context.Context) error { // primary key that was copied. A nil Result means that nothing has been copied. // A table that was fully copied is removed from copyState. func (vc *vcopier) copyNext(ctx context.Context, settings binlogplayer.VRSettings) error { - qr, err := vc.vr.dbClient.Execute(fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id = %d and id in (select max(id) from _vt.copy_state group by vrepl_id, table_name)", vc.vr.id)) + qr, err := vc.vr.dbClient.Execute(fmt.Sprintf("select table_name, lastpk from _vt.copy_state where vrepl_id = %d and id in (select max(id) from _vt.copy_state group by vrepl_id, table_name) order by table_name", vc.vr.id)) if err != nil { return err } From 86dc5697a47b0e51f2dd6eb09eabb5aceb35b499 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 6 Feb 2024 14:07:25 -0500 Subject: [PATCH 02/11] Adjust existing tests Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/vdiff/engine_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/engine_test.go b/go/vt/vttablet/tabletmanager/vdiff/engine_test.go index ca548a9a478..e6c9a84d9e2 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/engine_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/engine_test.go @@ -147,7 +147,7 @@ func TestVDiff(t *testing.T) { ), "NULL", ), nil) - vdenv.dbClient.ExpectRequest(fmt.Sprintf("select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = '%s' and table_name in ('t1')", vdiffDBName), sqltypes.MakeTestResult(sqltypes.MakeTestFields( + vdenv.dbClient.ExpectRequest(fmt.Sprintf("select table_name as table_name, table_rows as table_rows from INFORMATION_SCHEMA.TABLES where table_schema = '%s' and table_name in ('t1') order by table_name", vdiffDBName), sqltypes.MakeTestResult(sqltypes.MakeTestFields( "table_name|table_rows", "varchar|int64", ), @@ -192,7 +192,7 @@ func TestVDiff(t *testing.T) { vdenv.dbClient.ExpectRequest(`insert into _vt.vdiff_log(vdiff_id, message) values (1, 'completed: table \'t1\'')`, singleRowAffected, nil) vdenv.dbClient.ExpectRequest("update _vt.vdiff_table set state = 'completed' where vdiff_id = 1 and table_name = 't1'", singleRowAffected, nil) vdenv.dbClient.ExpectRequest(`insert into _vt.vdiff_log(vdiff_id, message) values (1, 'completed: table \'t1\'')`, singleRowAffected, nil) - vdenv.dbClient.ExpectRequest("select table_name as table_name from _vt.vdiff_table where vdiff_id = 1 and state != 'completed'", singleRowAffected, nil) + vdenv.dbClient.ExpectRequest("select table_name as table_name from _vt.vdiff_table where vdiff_id = 1 and state != 'completed' order by table_name", singleRowAffected, nil) vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'completed', last_error = left('', 1024) , completed_at = utc_timestamp() where id = 1", singleRowAffected, nil) vdenv.dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'State changed to: completed')", singleRowAffected, nil) From 34ccb3f9f3dcc41827afee4a9c3ccc5757fcd1af Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 6 Feb 2024 14:31:47 -0500 Subject: [PATCH 03/11] Sort the binlog source filter rules Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/rpc_vreplication.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication.go b/go/vt/vttablet/tabletmanager/rpc_vreplication.go index cf1d3946ed5..3ce84d01e2b 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication.go @@ -19,6 +19,7 @@ package tabletmanager import ( "context" "slices" + "sort" "strings" "google.golang.org/protobuf/encoding/prototext" @@ -60,6 +61,10 @@ func (tm *TabletManager) CreateVReplicationWorkflow(ctx context.Context, req *ta for _, bls := range req.BinlogSource { // Sort the tables by name to ensure a consistent order. slices.Sort(bls.Tables) + sort.Slice(bls.Filter.Rules, func(i, j int) bool { + return bls.Filter.Rules[i].Match < bls.Filter.Rules[j].Match + }) + source, err := prototext.Marshal(bls) if err != nil { return nil, err From 91c3f06bf06ed98feddbff9e5334fc8becb0e18f Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 6 Feb 2024 16:05:52 -0500 Subject: [PATCH 04/11] Add sorting utility function and update tests Signed-off-by: Matt Lord --- go/protoutil/binlogsource.go | 40 +++++++++ go/protoutil/binlogsource_test.go | 90 +++++++++++++++++++ go/vt/binlog/binlogplayer/binlog_player.go | 9 +- .../tabletmanager/rpc_vreplication.go | 13 +-- .../tabletmanager/rpc_vreplication_test.go | 86 +++++++++++++++++- .../vreplication/vcopier_test.go | 25 ++++-- 6 files changed, 241 insertions(+), 22 deletions(-) create mode 100644 go/protoutil/binlogsource.go create mode 100644 go/protoutil/binlogsource_test.go diff --git a/go/protoutil/binlogsource.go b/go/protoutil/binlogsource.go new file mode 100644 index 00000000000..7f37f78b6ab --- /dev/null +++ b/go/protoutil/binlogsource.go @@ -0,0 +1,40 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package protoutil + +import ( + "slices" + "sort" + "strings" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" +) + +// SortBinlogSourceTables sorts the table related contents of the +// BinlogSource struct lexicographically by table name in order to +// produce consistent results. +func SortBinlogSourceTables(bls *binlogdatapb.BinlogSource) { + // Sort the tables by name to ensure a consistent order. + slices.Sort(bls.Tables) + sort.Slice(bls.Filter.Rules, func(i, j int) bool { + // Remove preceding slash from the match string. + // That is used when the filter is a regular expression. + fi, _ := strings.CutPrefix(bls.Filter.Rules[i].Match, "/") + fj, _ := strings.CutPrefix(bls.Filter.Rules[j].Match, "/") + return fi < fj + }) +} diff --git a/go/protoutil/binlogsource_test.go b/go/protoutil/binlogsource_test.go new file mode 100644 index 00000000000..7429a27a8fa --- /dev/null +++ b/go/protoutil/binlogsource_test.go @@ -0,0 +1,90 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package protoutil + +import ( + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" +) + +func TestSortBinlogSourceTables(t *testing.T) { + tests := []struct { + name string + inSource *binlogdatapb.BinlogSource + outSource *binlogdatapb.BinlogSource + }{ + { + name: "Basic", + inSource: &binlogdatapb.BinlogSource{ + Tables: []string{"wuts1", "atable", "1table", "ztable2", "table3"}, + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: "ztable2", + }, + { + Match: "table3", + }, + { + Match: "/wuts", + }, + { + Match: "1table", + }, + { + Match: "atable", + }, + }, + }, + }, + outSource: &binlogdatapb.BinlogSource{ + Tables: []string{"1table", "atable", "table3", "wuts1", "ztable2"}, + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: "1table", + }, + { + Match: "atable", + }, + { + Match: "table3", + }, + { + Match: "/wuts", + }, + { + Match: "ztable2", + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require.NotNil(t, tt.inSource, "no input source provided") + require.NotNil(t, tt.outSource, "no output source provided") + SortBinlogSourceTables(tt.inSource) + require.True(t, proto.Equal(tt.inSource, tt.outSource)) + }) + } +} diff --git a/go/vt/binlog/binlogplayer/binlog_player.go b/go/vt/binlog/binlogplayer/binlog_player.go index f651f3bb25c..88c4093f451 100644 --- a/go/vt/binlog/binlogplayer/binlog_player.go +++ b/go/vt/binlog/binlogplayer/binlog_player.go @@ -34,14 +34,13 @@ import ( "time" "github.com/spf13/pflag" - - "vitess.io/vitess/go/mysql/replication" - "vitess.io/vitess/go/mysql/sqlerror" - "google.golang.org/protobuf/proto" "vitess.io/vitess/go/history" "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/mysql/replication" + "vitess.io/vitess/go/mysql/sqlerror" + "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" @@ -606,6 +605,7 @@ func ReadVRSettings(dbClient DBClient, uid int32) (VRSettings, error) { // the _vt.vreplication table. func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, position string, maxTPS, maxReplicationLag, timeUpdated int64, dbName string, workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType, deferSecondaryKeys bool) string { + protoutil.SortBinlogSourceTables(source) return fmt.Sprintf("insert into _vt.vreplication "+ "(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type, defer_secondary_keys) "+ "values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %d, %d, %v)", @@ -616,6 +616,7 @@ func CreateVReplication(workflow string, source *binlogdatapb.BinlogSource, posi // CreateVReplicationState returns a statement to create a stopped vreplication. func CreateVReplicationState(workflow string, source *binlogdatapb.BinlogSource, position string, state binlogdatapb.VReplicationWorkflowState, dbName string, workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType) string { + protoutil.SortBinlogSourceTables(source) return fmt.Sprintf("insert into _vt.vreplication "+ "(workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type) "+ "values (%v, %v, %v, %v, %v, %v, 0, '%v', %v, %d, %d)", diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication.go b/go/vt/vttablet/tabletmanager/rpc_vreplication.go index 3ce84d01e2b..3082f910b06 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication.go @@ -18,13 +18,12 @@ package tabletmanager import ( "context" - "slices" - "sort" "strings" "google.golang.org/protobuf/encoding/prototext" "vitess.io/vitess/go/constants/sidecar" + "vitess.io/vitess/go/protoutil" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/textutil" "vitess.io/vitess/go/vt/discovery" @@ -59,12 +58,7 @@ func (tm *TabletManager) CreateVReplicationWorkflow(ctx context.Context, req *ta } res := &sqltypes.Result{} for _, bls := range req.BinlogSource { - // Sort the tables by name to ensure a consistent order. - slices.Sort(bls.Tables) - sort.Slice(bls.Filter.Rules, func(i, j int) bool { - return bls.Filter.Rules[i].Match < bls.Filter.Rules[j].Match - }) - + protoutil.SortBinlogSourceTables(bls) source, err := prototext.Marshal(bls) if err != nil { return nil, err @@ -302,8 +296,7 @@ func (tm *TabletManager) UpdateVReplicationWorkflow(ctx context.Context, req *ta if !textutil.ValueIsSimulatedNull(req.OnDdl) { bls.OnDdl = req.OnDdl } - // Sort the tables by name to ensure a consistent order. - slices.Sort(bls.Tables) + protoutil.SortBinlogSourceTables(bls) source, err = prototext.Marshal(bls) if err != nil { return nil, err diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index 5ef9b4cd8c6..f760d19d67d 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -45,7 +45,7 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" - "vitess.io/vitess/go/vt/proto/vttime" + vttimepb "vitess.io/vitess/go/vt/proto/vttime" ) const ( @@ -167,6 +167,86 @@ func TestCreateVReplicationWorkflow(t *testing.T) { query: fmt.Sprintf(`%s values ('%s', 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"t1\" filter:\"select * from t1\"}} on_ddl:EXEC stop_after_copy:true source_time_zone:\"EDT\" target_time_zone:\"UTC\"', '', 0, 0, '%s', '', now(), 0, 'Stopped', '%s', 1, 0, 1)`, insertVReplicationPrefix, wf, sourceKs, shard, tenv.cells[0], tenv.dbName), }, + { + name: "binlog source order with include", + schema: &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: "zt", + Columns: []string{"id"}, + PrimaryKeyColumns: []string{"id"}, + Fields: sqltypes.MakeTestFields("id", "int64"), + }, + { + Name: "t1", + Columns: []string{"id", "c2"}, + PrimaryKeyColumns: []string{"id"}, + Fields: sqltypes.MakeTestFields("id|c2", "int64|int64"), + }, + { + Name: "wut", + Columns: []string{"id"}, + PrimaryKeyColumns: []string{"id"}, + Fields: sqltypes.MakeTestFields("id", "int64"), + }, + }, + }, + req: &vtctldatapb.MoveTablesCreateRequest{ + SourceKeyspace: sourceKs, + TargetKeyspace: targetKs, + Workflow: wf, + Cells: tenv.cells, + IncludeTables: []string{"zt", "wut", "t1"}, + SourceTimeZone: "EDT", + OnDdl: binlogdatapb.OnDDLAction_EXEC.String(), + StopAfterCopy: true, + DropForeignKeys: true, + DeferSecondaryKeys: true, + AutoStart: true, + }, + query: fmt.Sprintf(`%s values ('%s', 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"t1\" filter:\"select * from t1\"} rules:{match:\"wut\" filter:\"select * from wut\"} rules:{match:\"zt\" filter:\"select * from zt\"}} on_ddl:EXEC stop_after_copy:true source_time_zone:\"EDT\" target_time_zone:\"UTC\"', '', 0, 0, '%s', '', now(), 0, 'Stopped', '%s', 1, 0, 1)`, + insertVReplicationPrefix, wf, sourceKs, shard, tenv.cells[0], tenv.dbName), + }, + { + name: "binlog source order with all-tables", + schema: &tabletmanagerdatapb.SchemaDefinition{ + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: "zt", + Columns: []string{"id"}, + PrimaryKeyColumns: []string{"id"}, + Fields: sqltypes.MakeTestFields("id", "int64"), + }, + { + Name: "t1", + Columns: []string{"id", "c2"}, + PrimaryKeyColumns: []string{"id"}, + Fields: sqltypes.MakeTestFields("id|c2", "int64|int64"), + }, + { + Name: "wut", + Columns: []string{"id"}, + PrimaryKeyColumns: []string{"id"}, + Fields: sqltypes.MakeTestFields("id", "int64"), + }, + }, + }, + req: &vtctldatapb.MoveTablesCreateRequest{ + SourceKeyspace: sourceKs, + TargetKeyspace: targetKs, + Workflow: wf, + Cells: tenv.cells, + AllTables: true, + SourceTimeZone: "EDT", + OnDdl: binlogdatapb.OnDDLAction_EXEC.String(), + StopAfterCopy: true, + DropForeignKeys: true, + DeferSecondaryKeys: true, + AutoStart: true, + }, + query: fmt.Sprintf(`%s values ('%s', 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"t1\" filter:\"select * from t1\"} rules:{match:\"wut\" filter:\"select * from wut\"} rules:{match:\"zt\" filter:\"select * from zt\"}} on_ddl:EXEC stop_after_copy:true source_time_zone:\"EDT\" target_time_zone:\"UTC\"', '', 0, 0, '%s', '', now(), 0, 'Stopped', '%s', 1, 0, 1)`, + insertVReplicationPrefix, wf, sourceKs, shard, tenv.cells[0], tenv.dbName), + }, } tenv.tmc.setVReplicationExecResults(targetTablet.tablet, fmt.Sprintf("select 1 from _vt.vreplication where db_name='vt_%s' and workflow='%s'", @@ -426,7 +506,7 @@ func TestMoveTables(t *testing.T) { Keyspace: targetKs, Workflow: wf, Cells: tenv.cells, - MaxReplicationLagAllowed: &vttime.Duration{Seconds: 922337203}, + MaxReplicationLagAllowed: &vttimepb.Duration{Seconds: 922337203}, EnableReverseReplication: true, InitializeTargetSequences: true, Direction: int32(workflow.DirectionForward), @@ -447,7 +527,7 @@ func TestMoveTables(t *testing.T) { Keyspace: targetKs, Workflow: wf, Cells: tenv.cells, - MaxReplicationLagAllowed: &vttime.Duration{Seconds: 922337203}, + MaxReplicationLagAllowed: &vttimepb.Duration{Seconds: 922337203}, EnableReverseReplication: true, Direction: int32(workflow.DirectionBackward), }) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index 0e35036321f..c32482641b2 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "os" + "regexp" "strings" "testing" "time" @@ -562,15 +563,19 @@ func testPlayerCopyTables(t *testing.T) { defer deleteTablet(addTablet(100)) execStatements(t, []string{ + "create table ast1(id int, primary key(id))", "create table src1(id int, val varbinary(128), d decimal(8,0), j json, primary key(id))", "insert into src1 values(2, 'bbb', 1, '{\"foo\": \"bar\"}'), (1, 'aaa', 0, JSON_ARRAY(123456789012345678901234567890, \"abcd\")), (3, 'ccc', 2, 'null'), (4, 'ddd', 3, '{\"name\": \"matt\", \"size\": null}'), (5, 'eee', 4, null)", + fmt.Sprintf("create table %s.ast1(id int, primary key(id))", vrepldb), fmt.Sprintf("create table %s.dst1(id int, val varbinary(128), val2 varbinary(128), d decimal(8,0), j json, primary key(id))", vrepldb), "create table yes(id int, val varbinary(128), primary key(id))", fmt.Sprintf("create table %s.yes(id int, val varbinary(128), primary key(id))", vrepldb), "create table no(id int, val varbinary(128), primary key(id))", }) defer execStatements(t, []string{ + "drop table ast1", "drop table src1", + fmt.Sprintf("drop table %s.ast1", vrepldb), fmt.Sprintf("drop table %s.dst1", vrepldb), "drop table yes", fmt.Sprintf("drop table %s.yes", vrepldb), @@ -582,6 +587,9 @@ func testPlayerCopyTables(t *testing.T) { Rules: []*binlogdatapb.Rule{{ Match: "dst1", Filter: "select id, val, val as val2, d, j from src1", + }, { + Match: "ast1", + Filter: "select * from ast1", }, { Match: "/yes", }}, @@ -595,9 +603,7 @@ func testPlayerCopyTables(t *testing.T) { } query := binlogplayer.CreateVReplicationState("test", bls, "", binlogdatapb.VReplicationWorkflowState_Init, playerEngine.dbName, 0, 0) qr, err := playerEngine.Exec(query) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer func() { query := fmt.Sprintf("delete from _vt.vreplication where id = %d", qr.InsertID) if _, err := playerEngine.Exec(query); err != nil { @@ -607,15 +613,24 @@ func testPlayerCopyTables(t *testing.T) { }() expectDBClientQueries(t, qh.Expect( - "/insert into _vt.vreplication", + // Filters should be lexicographically ordered by name. + regexp.QuoteMeta("/insert into _vt.vreplication (workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, workflow_type, workflow_sub_type) values ('test', 'keyspace:\\\"vttest\\\" shard:\\\"0\\\" filter:{rules:{match:\\\"ast1\\\" filter:\\\"select * from ast1\\\"} rules:{match:\\\"dst1\\\" filter:\\\"select id, val, val as val2, d, j from src1\\\"} rules:{match:\\\"/yes\\\"}}'"), "/update _vt.vreplication set message='Picked source tablet.*", // Create the list of tables to copy and transition to Copying state. "begin", - "/insert into _vt.copy_state", + // The table names should be lexicographically ordered by name. + fmt.Sprintf("insert into _vt.copy_state(vrepl_id, table_name) values (%d, 'ast1'), (%d, 'dst1'), (%d, 'yes')", qr.InsertID, qr.InsertID, qr.InsertID), "/update _vt.vreplication set state='Copying'", "commit", // The first fast-forward has no starting point. So, it just saves the current position. "/update _vt.vreplication set pos=", + // Now the tables should be copied in lexicographical order: ast1, dst1, yes. + // Nothing to copy from ast1. Delete from copy_state. + "/delete cs, pca from _vt.copy_state as cs left join _vt.post_copy_action as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name.*ast1", + // The next FF executes and updates the position before copying. + "begin", + "/update _vt.vreplication set pos=", + "commit", "begin", "insert into dst1(id,val,val2,d,j) values (1,'aaa','aaa',0,JSON_ARRAY(123456789012345678901234567890, _utf8mb4'abcd')), (2,'bbb','bbb',1,JSON_OBJECT(_utf8mb4'foo', _utf8mb4'bar')), (3,'ccc','ccc',2,CAST(_utf8mb4'null' as JSON)), (4,'ddd','ddd',3,JSON_OBJECT(_utf8mb4'name', _utf8mb4'matt', _utf8mb4'size', null)), (5,'eee','eee',4,null)", `/insert into _vt.copy_state \(lastpk, vrepl_id, table_name\) values \('fields:{name:\\"id\\" type:INT32 charset:63 flags:53251} rows:{lengths:1 values:\\"5\\"}'.*`, From d0cdff0615302472f603ff219ddf729051360662 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 7 Feb 2024 12:03:50 -0500 Subject: [PATCH 05/11] Improve vdiff test failure messages Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/vdiff_helper_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/vreplication/vdiff_helper_test.go b/go/test/endtoend/vreplication/vdiff_helper_test.go index e7d0b714834..91605bff402 100644 --- a/go/test/endtoend/vreplication/vdiff_helper_test.go +++ b/go/test/endtoend/vreplication/vdiff_helper_test.go @@ -90,13 +90,14 @@ func doVtctlclientVDiff(t *testing.T, keyspace, workflow, cells string, want *ex func waitForVDiff2ToComplete(t *testing.T, useVtctlclient bool, ksWorkflow, cells, uuid string, completedAtMin time.Time) *vdiffInfo { var info *vdiffInfo + var jsonStr string first := true previousProgress := vdiff2.ProgressReport{} ch := make(chan bool) go func() { for { time.Sleep(vdiffStatusCheckInterval) - _, jsonStr := performVDiff2Action(t, useVtctlclient, ksWorkflow, cells, "show", uuid, false) + _, jsonStr = performVDiff2Action(t, useVtctlclient, ksWorkflow, cells, "show", uuid, false) info = getVDiffInfo(jsonStr) require.NotNil(t, info) if info.State == "completed" { @@ -142,7 +143,7 @@ func waitForVDiff2ToComplete(t *testing.T, useVtctlclient bool, ksWorkflow, cell case <-ch: return info case <-time.After(vdiffTimeout): - log.Errorf("VDiff never completed for UUID %s", uuid) + log.Errorf("VDiff never completed for UUID %s. Latest output: %s", uuid, jsonStr) require.FailNow(t, fmt.Sprintf("VDiff never completed for UUID %s", uuid)) return nil } From 1fb9738bb940494a22df068749a88e266a83e474 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 7 Feb 2024 12:41:24 -0500 Subject: [PATCH 06/11] Fix unrelated unlock err bug Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/vdiff/table_differ.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index d658fea2a25..b3634e37ab5 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -112,7 +112,7 @@ func (td *tableDiffer) initialize(ctx context.Context) error { defer func() { unlock(&err) if err != nil { - log.Errorf("UnlockKeyspace %s failed: %v", targetKeyspace, lockErr) + log.Errorf("UnlockKeyspace %s failed: %v", targetKeyspace, err) } }() From a9631d234126eabefd5274852cc5ab353cc81122 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 7 Feb 2024 12:53:52 -0500 Subject: [PATCH 07/11] Remove unnecessary/unwanted sort in UpdateVReplicationWorkflow We're only supposed to be potentially updating the OnDDL flag there. Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/rpc_vreplication.go | 1 - 1 file changed, 1 deletion(-) diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication.go b/go/vt/vttablet/tabletmanager/rpc_vreplication.go index 3082f910b06..ee1907005a8 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication.go @@ -296,7 +296,6 @@ func (tm *TabletManager) UpdateVReplicationWorkflow(ctx context.Context, req *ta if !textutil.ValueIsSimulatedNull(req.OnDdl) { bls.OnDdl = req.OnDdl } - protoutil.SortBinlogSourceTables(bls) source, err = prototext.Marshal(bls) if err != nil { return nil, err From 0bd153a20ba6aadca29b696f59e729abb5eb2c44 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 7 Feb 2024 13:15:28 -0500 Subject: [PATCH 08/11] Add missing sort and fixup tests Signed-off-by: Matt Lord --- go/protoutil/binlogsource.go | 6 ++++++ .../tabletmanager/rpc_vreplication_test.go | 16 ++++++++-------- .../vreplication/insert_generator.go | 2 ++ 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/go/protoutil/binlogsource.go b/go/protoutil/binlogsource.go index 7f37f78b6ab..f890543f485 100644 --- a/go/protoutil/binlogsource.go +++ b/go/protoutil/binlogsource.go @@ -28,8 +28,14 @@ import ( // BinlogSource struct lexicographically by table name in order to // produce consistent results. func SortBinlogSourceTables(bls *binlogdatapb.BinlogSource) { + if bls == nil { + return + } // Sort the tables by name to ensure a consistent order. slices.Sort(bls.Tables) + if bls.Filter == nil || len(bls.Filter.Rules) == 0 { + return + } sort.Slice(bls.Filter.Rules, func(i, j int) bool { // Remove preceding slash from the match string. // That is used when the filter is a regular expression. diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index f760d19d67d..1db4e02b67b 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -556,7 +556,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { } selectQuery, err := parsed.GenerateQuery(bindVars, nil) require.NoError(t, err) - blsStr := fmt.Sprintf(`keyspace:"%s" shard:"%s" filter:{rules:{match:"customer" filter:"select * from customer"} rules:{match:"corder" filter:"select * from corder"}}`, + blsStr := fmt.Sprintf(`keyspace:"%s" shard:"%s" filter:{rules:{match:"corder" filter:"select * from corder"} rules:{match:"customer" filter:"select * from customer"}}`, keyspace, shard) selectRes := sqltypes.MakeTestResult( sqltypes.MakeTestFields( @@ -589,7 +589,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { Cells: []string{"zone2"}, // TabletTypes is an empty value, so the current value should be cleared }, - query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '' where id in (%d)`, + query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"corder\" filter:\"select * from corder\"} rules:{match:\"customer\" filter:\"select * from customer\"}}', cell = '%s', tablet_types = '' where id in (%d)`, keyspace, shard, "zone2", vreplID), }, { @@ -600,7 +600,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { Cells: []string{"zone3"}, TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)}, // So keep the current value of replica }, - query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`, + query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"corder\" filter:\"select * from corder\"} rules:{match:\"customer\" filter:\"select * from customer\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`, keyspace, shard, "zone3", tabletTypes[0], vreplID), }, { @@ -611,7 +611,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { TabletSelectionPreference: tabletmanagerdatapb.TabletSelectionPreference_INORDER, TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY, topodatapb.TabletType_REPLICA}, }, - query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '', tablet_types = '%s' where id in (%d)`, + query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"corder\" filter:\"select * from corder\"} rules:{match:\"customer\" filter:\"select * from customer\"}}', cell = '', tablet_types = '%s' where id in (%d)`, keyspace, shard, "in_order:rdonly,replica", vreplID), }, { @@ -622,7 +622,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { Cells: textutil.SimulatedNullStringSlice, // So keep the current value of zone1 TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY}, }, - query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`, + query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"corder\" filter:\"select * from corder\"} rules:{match:\"customer\" filter:\"select * from customer\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`, keyspace, shard, cells[0], "rdonly", vreplID), }, { @@ -632,7 +632,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { State: binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt), OnDdl: binlogdatapb.OnDDLAction_EXEC, }, - query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '', tablet_types = '' where id in (%d)`, + query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"corder\" filter:\"select * from corder\"} rules:{match:\"customer\" filter:\"select * from customer\"}} on_ddl:%s', cell = '', tablet_types = '' where id in (%d)`, keyspace, shard, binlogdatapb.OnDDLAction_EXEC.String(), vreplID), }, { @@ -644,7 +644,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { TabletTypes: []topodatapb.TabletType{topodatapb.TabletType_RDONLY, topodatapb.TabletType_REPLICA, topodatapb.TabletType_PRIMARY}, OnDdl: binlogdatapb.OnDDLAction_EXEC_IGNORE, }, - query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}} on_ddl:%s', cell = '%s', tablet_types = '%s' where id in (%d)`, + query: fmt.Sprintf(`update _vt.vreplication set state = 'Running', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"corder\" filter:\"select * from corder\"} rules:{match:\"customer\" filter:\"select * from customer\"}} on_ddl:%s', cell = '%s', tablet_types = '%s' where id in (%d)`, keyspace, shard, binlogdatapb.OnDDLAction_EXEC_IGNORE.String(), "zone1,zone2,zone3", "rdonly,replica,primary", vreplID), }, { @@ -656,7 +656,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)}, OnDdl: binlogdatapb.OnDDLAction(textutil.SimulatedNullInt), }, - query: fmt.Sprintf(`update _vt.vreplication set state = '%s', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`, + query: fmt.Sprintf(`update _vt.vreplication set state = '%s', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"corder\" filter:\"select * from corder\"} rules:{match:\"customer\" filter:\"select * from customer\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`, binlogdatapb.VReplicationWorkflowState_Stopped.String(), keyspace, shard, cells[0], tabletTypes[0], vreplID), }, } diff --git a/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go b/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go index da1753a8444..1ab45d2414d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go @@ -21,6 +21,7 @@ import ( "strings" "time" + "vitess.io/vitess/go/protoutil" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" "vitess.io/vitess/go/vt/throttler" ) @@ -50,6 +51,7 @@ func NewInsertGenerator(state binlogdatapb.VReplicationWorkflowState, dbname str // AddRow adds a row to the insert statement. func (ig *InsertGenerator) AddRow(workflow string, bls *binlogdatapb.BinlogSource, pos, cell, tabletTypes string, workflowType binlogdatapb.VReplicationWorkflowType, workflowSubType binlogdatapb.VReplicationWorkflowSubType, deferSecondaryKeys bool) { + protoutil.SortBinlogSourceTables(bls) fmt.Fprintf(ig.buf, "%s(%v, %v, %v, %v, %v, %v, %v, %v, 0, '%v', %v, %d, %d, %v)", ig.prefix, encodeString(workflow), From 024fd02844bec815f32715adeaec7b6ffde3fb34 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 7 Feb 2024 13:24:30 -0500 Subject: [PATCH 09/11] Improve unit test Signed-off-by: Matt Lord --- go/protoutil/binlogsource_test.go | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/go/protoutil/binlogsource_test.go b/go/protoutil/binlogsource_test.go index 7429a27a8fa..438f6e0657d 100644 --- a/go/protoutil/binlogsource_test.go +++ b/go/protoutil/binlogsource_test.go @@ -78,11 +78,36 @@ func TestSortBinlogSourceTables(t *testing.T) { }, }, }, + { + name: "Nil", + inSource: nil, + outSource: nil, + }, + { + name: "No filter", + inSource: &binlogdatapb.BinlogSource{ + Tables: []string{"wuts1", "atable", "1table", "ztable2", "table3"}, + Filter: nil, + }, + outSource: &binlogdatapb.BinlogSource{ + Tables: []string{"1table", "atable", "table3", "wuts1", "ztable2"}, + Filter: nil, + }, + }, + { + name: "No filter rules", + inSource: &binlogdatapb.BinlogSource{ + Tables: []string{"wuts1", "atable", "1table", "ztable2", "table3"}, + Filter: &binlogdatapb.Filter{}, + }, + outSource: &binlogdatapb.BinlogSource{ + Tables: []string{"1table", "atable", "table3", "wuts1", "ztable2"}, + Filter: &binlogdatapb.Filter{}, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - require.NotNil(t, tt.inSource, "no input source provided") - require.NotNil(t, tt.outSource, "no output source provided") SortBinlogSourceTables(tt.inSource) require.True(t, proto.Equal(tt.inSource, tt.outSource)) }) From 7709300b7379bbdd0c017f6d2ef42c862a72da4f Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 7 Feb 2024 15:21:26 -0500 Subject: [PATCH 10/11] Improve sorting and tests Signed-off-by: Matt Lord --- go/protoutil/binlogsource.go | 16 +++- go/protoutil/binlogsource_test.go | 96 ++++++++++++++++++- go/vt/vtctl/workflow/resharder.go | 9 +- .../vreplication/insert_generator.go | 3 +- 4 files changed, 119 insertions(+), 5 deletions(-) diff --git a/go/protoutil/binlogsource.go b/go/protoutil/binlogsource.go index f890543f485..385f472c202 100644 --- a/go/protoutil/binlogsource.go +++ b/go/protoutil/binlogsource.go @@ -31,16 +31,30 @@ func SortBinlogSourceTables(bls *binlogdatapb.BinlogSource) { if bls == nil { return } + // Sort the tables by name to ensure a consistent order. slices.Sort(bls.Tables) + if bls.Filter == nil || len(bls.Filter.Rules) == 0 { return } sort.Slice(bls.Filter.Rules, func(i, j int) bool { + // Exclude filters should logically be processed first. + if bls.Filter.Rules[i].Filter == "exclude" && bls.Filter.Rules[j].Filter != "exclude" { + return true + } + if bls.Filter.Rules[j].Filter == "exclude" && bls.Filter.Rules[i].Filter != "exclude" { + return false + } + // Remove preceding slash from the match string. // That is used when the filter is a regular expression. fi, _ := strings.CutPrefix(bls.Filter.Rules[i].Match, "/") fj, _ := strings.CutPrefix(bls.Filter.Rules[j].Match, "/") - return fi < fj + if fi != fj { + return fi < fj + } + + return bls.Filter.Rules[i].Filter < bls.Filter.Rules[j].Filter }) } diff --git a/go/protoutil/binlogsource_test.go b/go/protoutil/binlogsource_test.go index 438f6e0657d..fe5564535bd 100644 --- a/go/protoutil/binlogsource_test.go +++ b/go/protoutil/binlogsource_test.go @@ -46,6 +46,10 @@ func TestSortBinlogSourceTables(t *testing.T) { { Match: "/wuts", }, + { + Match: "1table", + Filter: "a", + }, { Match: "1table", }, @@ -62,6 +66,10 @@ func TestSortBinlogSourceTables(t *testing.T) { { Match: "1table", }, + { + Match: "1table", + Filter: "a", + }, { Match: "atable", }, @@ -78,6 +86,92 @@ func TestSortBinlogSourceTables(t *testing.T) { }, }, }, + { + name: "With excludes", + inSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: "./*", + }, + { + Match: "no4", + Filter: "exclude", + }, + { + Match: "no2", + Filter: "exclude", + }, + { + Match: "ztable2", + }, + { + Match: "atable2", + }, + }, + }, + }, + outSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: "no2", + Filter: "exclude", + }, + { + Match: "no4", + Filter: "exclude", + }, + { + Match: "./*", + }, + { + Match: "atable2", + }, + { + Match: "ztable2", + }, + }, + }, + }, + }, + { + name: "With excludes", + inSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: "no4", + Filter: "exclude", + }, + { + Match: "no2", + Filter: "exclude", + }, + { + Match: "./*", + }, + }, + }, + }, + outSource: &binlogdatapb.BinlogSource{ + Filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{ + { + Match: "no2", + Filter: "exclude", + }, + { + Match: "no4", + Filter: "exclude", + }, + { + Match: "./*", + }, + }, + }, + }, + }, { name: "Nil", inSource: nil, @@ -109,7 +203,7 @@ func TestSortBinlogSourceTables(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { SortBinlogSourceTables(tt.inSource) - require.True(t, proto.Equal(tt.inSource, tt.outSource)) + require.True(t, proto.Equal(tt.inSource, tt.outSource), "got: %s, want: %s", tt.inSource.String(), tt.outSource.String()) }) } } diff --git a/go/vt/vtctl/workflow/resharder.go b/go/vt/vtctl/workflow/resharder.go index e36b546c1d2..ce0e0a13eb9 100644 --- a/go/vt/vtctl/workflow/resharder.go +++ b/go/vt/vtctl/workflow/resharder.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "slices" "sync" "time" @@ -29,6 +30,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/key" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topotools" @@ -281,8 +283,8 @@ func (rs *resharder) createStreams(ctx context.Context) error { ig := vreplication.NewInsertGenerator(binlogdatapb.VReplicationWorkflowState_Stopped, targetPrimary.DbName()) - // copy excludeRules to prevent data race. - copyExcludeRules := append([]*binlogdatapb.Rule(nil), excludeRules...) + // Clone excludeRules to prevent data races. + copyExcludeRules := slices.Clone(excludeRules) for _, source := range rs.sourceShards { if !key.KeyRangeIntersect(target.KeyRange, source.KeyRange) { continue @@ -307,13 +309,16 @@ func (rs *resharder) createStreams(ctx context.Context) error { } for _, rstream := range rs.refStreams { + log.Errorf("DEBUG: Before AddRow: %s", rstream.bls.String()) ig.AddRow(rstream.workflow, rstream.bls, "", rstream.cell, rstream.tabletTypes, // TODO: fix based on original stream. binlogdatapb.VReplicationWorkflowType_Reshard, binlogdatapb.VReplicationWorkflowSubType_None, rs.deferSecondaryKeys) + log.Errorf("DEBUG: After AddRow: %s", rstream.bls.String()) } query := ig.String() + log.Errorf("DEBUG: Full statement: %s", query) if _, err := rs.s.tmc.VReplicationExec(ctx, targetPrimary.Tablet, query); err != nil { return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", targetPrimary.Tablet, query) } diff --git a/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go b/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go index 1ab45d2414d..62af8c9396d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/insert_generator.go @@ -22,8 +22,9 @@ import ( "time" "vitess.io/vitess/go/protoutil" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" "vitess.io/vitess/go/vt/throttler" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) // InsertGenerator generates a vreplication insert statement. From 84a2ef2ade2d39e139fc7fad41117e06ba724f6b Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 7 Feb 2024 16:09:34 -0500 Subject: [PATCH 11/11] Minor changes after self review Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/resharder.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/go/vt/vtctl/workflow/resharder.go b/go/vt/vtctl/workflow/resharder.go index ce0e0a13eb9..18f10f25319 100644 --- a/go/vt/vtctl/workflow/resharder.go +++ b/go/vt/vtctl/workflow/resharder.go @@ -30,7 +30,6 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topotools" @@ -309,16 +308,13 @@ func (rs *resharder) createStreams(ctx context.Context) error { } for _, rstream := range rs.refStreams { - log.Errorf("DEBUG: Before AddRow: %s", rstream.bls.String()) ig.AddRow(rstream.workflow, rstream.bls, "", rstream.cell, rstream.tabletTypes, // TODO: fix based on original stream. binlogdatapb.VReplicationWorkflowType_Reshard, binlogdatapb.VReplicationWorkflowSubType_None, rs.deferSecondaryKeys) - log.Errorf("DEBUG: After AddRow: %s", rstream.bls.String()) } query := ig.String() - log.Errorf("DEBUG: Full statement: %s", query) if _, err := rs.s.tmc.VReplicationExec(ctx, targetPrimary.Tablet, query); err != nil { return vterrors.Wrapf(err, "VReplicationExec(%v, %s)", targetPrimary.Tablet, query) }