From 02b7f06c3839f286b73654ae5923056b92a98328 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 9 Jan 2025 10:40:10 -0500 Subject: [PATCH] Add PKE handling for source as well Signed-off-by: Matt Lord --- .../tabletmanager/vdiff/table_differ.go | 35 +++++++++++++------ .../tabletmanager/vdiff/workflow_differ.go | 1 - 2 files changed, 24 insertions(+), 12 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index 359f18a3b3f..a98af2d54fa 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -20,7 +20,6 @@ import ( "context" "encoding/json" "fmt" - "slices" "strings" "sync" "time" @@ -36,6 +35,7 @@ import ( "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" @@ -746,7 +746,6 @@ func (td *tableDiffer) updateTableProgress(dbClient binlogplayer.DBClient, dr *D if err != nil { return err } - log.Errorf("DEBUG: lastpk for table %s to DB: %s", td.table.Name, string(lastPKTxt)) query, err = sqlparser.ParseAndBind(sqlUpdateTableProgress, sqltypes.Int64BindVariable(dr.ProcessedRows), sqltypes.StringBindVariable(string(lastPKTxt)), @@ -826,7 +825,6 @@ func updateTableMismatch(dbClient binlogplayer.DBClient, vdiffID int64, table st } func (td *tableDiffer) lastPKFromRow(row []sqltypes.Value) *tabletmanagerdatapb.VDiffTableLastPK { - var source, target *querypb.QueryResult buildQR := func(pkCols []int) *querypb.QueryResult { pkColCnt := len(pkCols) pkFields := make([]*querypb.Field, pkColCnt) @@ -840,15 +838,9 @@ func (td *tableDiffer) lastPKFromRow(row []sqltypes.Value) *tabletmanagerdatapb. Rows: []*querypb.Row{sqltypes.RowToProto3(pkVals)}, } } - target = buildQR(td.tablePlan.pkCols) - if len(td.tablePlan.sourcePkCols) == 0 || slices.Equal(td.tablePlan.sourcePkCols, td.tablePlan.pkCols) { - source = target - } else { - source = buildQR(td.tablePlan.sourcePkCols) - } return &tabletmanagerdatapb.VDiffTableLastPK{ - Source: source, - Target: target, + Source: buildQR(td.tablePlan.sourcePkCols), + Target: buildQR(td.tablePlan.pkCols), } } @@ -905,6 +897,9 @@ func (td *tableDiffer) getSourcePKCols() error { ctx, cancel := context.WithTimeout(td.wd.ct.vde.ctx, topo.RemoteOperationTimeout*3) defer cancel() // We use the first sourceShard as all of them should have the same schema. + if len(td.wd.ct.sources) == 0 { + return fmt.Errorf("no source shards found") + } sourceShardName := maps.Keys(td.wd.ct.sources)[0] sourceTS, err := td.wd.getSourceTopoServer() if err != nil { @@ -928,6 +923,24 @@ func (td *tableDiffer) getSourcePKCols() error { return err } sourceTable := sourceSchema.TableDefinitions[0] + if len(sourceTable.PrimaryKeyColumns) == 0 { + // We use the columns from a PKE if there is one. + executeFetch := func(query string, maxrows int, wantfields bool) (*sqltypes.Result, error) { + res, err := td.wd.ct.tmc.ExecuteFetchAsApp(ctx, sourceTablet.Tablet, false, &tabletmanagerdatapb.ExecuteFetchAsAppRequest{ + Query: []byte(query), + MaxRows: 1, + }) + if err != nil { + return nil, err + } + return sqltypes.Proto3ToResult(res), nil + } + pkeCols, _, err := mysqlctl.GetPrimaryKeyEquivalentColumns(ctx, executeFetch, td.wd.ct.sourceKeyspace, td.table.Name) + if err != nil { + return err + } + sourceTable.PrimaryKeyColumns = pkeCols + } sourcePKColumns := make(map[string]struct{}, len(sourceTable.PrimaryKeyColumns)) td.tablePlan.sourcePkCols = make([]int, 0, len(sourceTable.PrimaryKeyColumns)) for _, pkc := range sourceTable.PrimaryKeyColumns { diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go index fef0bf7a1d3..3315607dc9e 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go @@ -416,7 +416,6 @@ func (wd *workflowDiffer) getTableLastPK(dbClient binlogplayer.DBClient, tableNa if lastpk, err = qr.Named().Row().ToBytes("lastpk"); err != nil { return nil, err } - log.Errorf("DEBUG: lastpk for table %s from DB: %s", tableName, string(lastpk)) if len(lastpk) != 0 { lastPK := &tabletmanagerdatapb.VDiffTableLastPK{} if err := prototext.Unmarshal(lastpk, lastPK); err != nil {