Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VDiff: Support a max diff time for tables #14786

Merged
merged 22 commits into from
Dec 27, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
More work
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Dec 14, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit aeb72e10dca68250f227a50259a1e2b3d619cd5e
15 changes: 12 additions & 3 deletions go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
@@ -52,6 +52,8 @@ import (
// how long to wait for background operations to complete
var BackgroundOperationTimeout = topo.RemoteOperationTimeout * 4

var ErrMaxDiffDurationExceeded = vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "table diff was stopped due to exceeding the max-diff-duration time")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice


// compareColInfo contains the metadata for a column of the table being diffed
type compareColInfo struct {
colIndex int // index of the column in the filter's select
@@ -522,7 +524,7 @@ func (td *tableDiffer) diff(ctx context.Context, rowsToCompare int64, debug, onl
case <-td.wd.ct.done:
return nil, vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped by user")
case <-stop:
return nil, vterrors.Errorf(vtrpcpb.Code_CANCELED, "vdiff was stopped due to max execution time")
return nil, ErrMaxDiffDurationExceeded
default:
}

@@ -535,7 +537,7 @@ func (td *tableDiffer) diff(ctx context.Context, rowsToCompare int64, debug, onl
}
rowsToCompare--
if rowsToCompare < 0 {
log.Infof("Stopping vdiff, specified limit reached")
log.Infof("Stopping vdiff, specified row limit reached")
return dr, nil
}
if advanceSource {
@@ -566,7 +568,7 @@ func (td *tableDiffer) diff(ctx context.Context, rowsToCompare int64, debug, onl
}
dr.ExtraRowsTargetDiffs = append(dr.ExtraRowsTargetDiffs, diffRow)

// drain target, update count
// Drain target, update count.
count, err := targetExecutor.drain(ctx)
if err != nil {
return nil, err
@@ -701,6 +703,13 @@ func (td *tableDiffer) updateTableProgress(dbClient binlogplayer.DBClient, dr *D
if err != nil {
return err
}
// Update the in-memory lastPK as well so that we can restart the table
// diff if --max-diff-time was specified.
lastpkpb := &querypb.QueryResult{}
if err := prototext.Unmarshal(lastPK, lastpkpb); err != nil {
return err
}
td.lastPK = lastpkpb

query, err = sqlparser.ParseAndBind(sqlUpdateTableProgress,
sqltypes.Int64BindVariable(dr.ProcessedRows),
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/vdiff/table_plan.go
Original file line number Diff line number Diff line change
@@ -167,8 +167,8 @@ func (td *tableDiffer) buildTablePlan(dbClient binlogplayer.DBClient, dbName str

tp.sourceQuery = sqlparser.String(sourceSelect)
tp.targetQuery = sqlparser.String(targetSelect)
log.Info("VDiff query on source: %v", tp.sourceQuery)
log.Info("VDiff query on target: %v", tp.targetQuery)
log.Infof("VDiff query on source: %v", tp.sourceQuery)
log.Infof("VDiff query on target: %v", tp.targetQuery)

tp.aggregates = aggregates
td.tablePlan = tp
40 changes: 23 additions & 17 deletions go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@ package vdiff

import (
"context"
"errors"
"fmt"
"reflect"
"strings"
@@ -154,6 +155,17 @@ func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.D
}
}()

maxDiffRuntime := time.Duration(24 * time.Hour * 365) // 1 year (effectively forever)
if wd.ct.options.CoreOptions.MaxDiffSeconds > 0 {
// Restart the diff if it takes longer than the specified max diff time.
maxDiffRuntime = time.Duration(wd.ct.options.CoreOptions.MaxDiffSeconds) * time.Second
}

log.Infof("Starting differ on table %s for vdiff %s", td.table.Name, wd.ct.uuid)
if err := td.updateTableState(ctx, dbClient, StartedState); err != nil {
return err
}

for {
select {
case <-ctx.Done():
@@ -163,32 +175,26 @@ func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.D
default:
}

// Restart the diff if it takes longer than the specified max diff time.
if wd.ct.options != nil && wd.ct.options.CoreOptions != nil && wd.ct.options.CoreOptions.MaxDiffSeconds > 0 {
if timer != nil { // We're restarting the diff
timer.Stop()
timer = nil
// Give the underlying resources (mainly MySQL) a moment to catch up.
time.Sleep(30 * time.Second)
}
timer = time.NewTimer(time.Duration(wd.ct.options.CoreOptions.MaxDiffSeconds) * time.Second)
} else {
timer = time.NewTimer(24 * time.Hour * 365) // 1 year (effectively forever)
if timer != nil { // We're restarting the diff
timer.Stop()
timer = nil
// Give the underlying resources (mainly MySQL) a moment to catch up.
time.Sleep(30 * time.Second)
}
timer = time.NewTimer(maxDiffRuntime)

log.Infof("Starting differ on table %s for vdiff %s", td.table.Name, wd.ct.uuid)
if err := td.updateTableState(ctx, dbClient, StartedState); err != nil {
return err
}
if err := td.initialize(ctx); err != nil {
return err
}
log.Infof("Table initialization done on table %s for vdiff %s", td.table.Name, wd.ct.uuid)
dr, diffErr = td.diff(ctx, wd.opts.CoreOptions.MaxRows, wd.opts.ReportOptions.DebugQuery, wd.opts.ReportOptions.OnlyPks, wd.opts.CoreOptions.MaxExtraRowsToCompare, wd.opts.ReportOptions.MaxSampleRows, timer.C)
if diffErr == nil {
log.Errorf("Encountered an error diffing table %s for vdiff %s: %v", td.table.Name, wd.ct.uuid, diffErr)
if diffErr == nil { // We finished the diff successfully
break
}
log.Errorf("Encountered an error diffing table %s for vdiff %s: %v", td.table.Name, wd.ct.uuid, diffErr)
if !errors.Is(diffErr, ErrMaxDiffDurationExceeded) { // We only want to retry if we hit the max-diff-duration
return diffErr
}
}
log.Infof("Table diff done on table %s for vdiff %s with report: %+v", td.table.Name, wd.ct.uuid, dr)