Skip to content

Commit

Permalink
VDiff: wait for shard streams of one table diff to complete for befor…
Browse files Browse the repository at this point in the history
…e starting that of the next table (#14345)

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
Signed-off-by: Matt Lord <mattalord@gmail.com>
Co-authored-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
rohit-nayak-ps and mattlord authored Oct 27, 2023
1 parent 878378b commit 54f2daf
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 17 deletions.
2 changes: 1 addition & 1 deletion go/vt/sidecardb/schema/vdiff/vdiff.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ CREATE TABLE IF NOT EXISTS vdiff
`started_at` timestamp NULL DEFAULT NULL,
`liveness_timestamp` timestamp NULL DEFAULT NULL,
`completed_at` timestamp NULL DEFAULT NULL,
`last_error` varbinary(512) DEFAULT NULL,
`last_error` varbinary(1024) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `uuid_idx` (`vdiff_uuid`),
KEY `state` (`state`),
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vttablet/tabletmanager/vdiff/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestEngineOpen(t *testing.T) {
), nil)

// Now let's short circuit the vdiff as we know that the open has worked as expected.
shortCircuitTestAfterQuery("update _vt.vdiff set state = 'started', last_error = '' , started_at = utc_timestamp() where id = 1", vdiffenv.dbClient)
shortCircuitTestAfterQuery("update _vt.vdiff set state = 'started', last_error = left('', 1024) , started_at = utc_timestamp() where id = 1", vdiffenv.dbClient)

vdenv.vde.Open(context.Background(), vdiffenv.vre)
defer vdenv.vde.Close()
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestVDiff(t *testing.T) {
),
fmt.Sprintf("1|%s|%s|%s||9223372036854775807|9223372036854775807||PRIMARY,REPLICA|1669511347|0|Running||%s|200||1669511347|1|0||1", vdiffenv.workflow, vreplSource, vdiffSourceGtid, vdiffDBName),
), nil)
vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'started', last_error = '' , started_at = utc_timestamp() where id = 1", singleRowAffected, nil)
vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'started', last_error = left('', 1024) , started_at = utc_timestamp() where id = 1", singleRowAffected, nil)
vdenv.dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'State changed to: started')", singleRowAffected, nil)
vdenv.dbClient.ExpectRequest(`select vdt.lastpk as lastpk, vdt.mismatch as mismatch, vdt.report as report
from _vt.vdiff as vd inner join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
Expand Down Expand Up @@ -194,7 +194,7 @@ func TestVDiff(t *testing.T) {
vdenv.dbClient.ExpectRequest("update _vt.vdiff_table set state = 'completed' where vdiff_id = 1 and table_name = 't1'", singleRowAffected, nil)
vdenv.dbClient.ExpectRequest(`insert into _vt.vdiff_log(vdiff_id, message) values (1, 'completed: table \'t1\'')`, singleRowAffected, nil)
vdenv.dbClient.ExpectRequest("select table_name as table_name from _vt.vdiff_table where vdiff_id = 1 and state != 'completed'", singleRowAffected, nil)
vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'completed', last_error = '' , completed_at = utc_timestamp() where id = 1", singleRowAffected, nil)
vdenv.dbClient.ExpectRequest("update _vt.vdiff set state = 'completed', last_error = left('', 1024) , completed_at = utc_timestamp() where id = 1", singleRowAffected, nil)
vdenv.dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'State changed to: completed')", singleRowAffected, nil)

vdenv.vde.mu.Lock()
Expand Down Expand Up @@ -271,7 +271,7 @@ func TestEngineRetryErroredVDiffs(t *testing.T) {
), nil)

// At this point we know that we kicked off the expected retry so we can short circit the vdiff.
shortCircuitTestAfterQuery(fmt.Sprintf("update _vt.vdiff set state = 'started', last_error = '' , started_at = utc_timestamp() where id = %s", id), vdiffenv.dbClient)
shortCircuitTestAfterQuery(fmt.Sprintf("update _vt.vdiff set state = 'started', last_error = left('', 1024) , started_at = utc_timestamp() where id = %s", id), vdiffenv.dbClient)

expectedControllerCnt++
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vdiff/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func resetBinlogClient() {
// has verified the necessary behavior.
func shortCircuitTestAfterQuery(query string, dbClient *binlogplayer.MockDBClient) {
dbClient.ExpectRequest(query, singleRowAffected, fmt.Errorf("Short circuiting test"))
dbClient.ExpectRequest("update _vt.vdiff set state = 'error', last_error = 'Short circuiting test' where id = 1", singleRowAffected, nil)
dbClient.ExpectRequest("update _vt.vdiff set state = 'error', last_error = left('Short circuiting test', 1024) where id = 1", singleRowAffected, nil)
dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'State changed to: error')", singleRowAffected, nil)
dbClient.ExpectRequest("insert into _vt.vdiff_log(vdiff_id, message) values (1, 'Error: Short circuiting test')", singleRowAffected, nil)
}
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletmanager/vdiff/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ const (
IF(vdt.mismatch = 1, 1, 0) as has_mismatch, vdt.report as report
from _vt.vdiff as vd left join _vt.vdiff_table as vdt on (vd.id = vdt.vdiff_id)
where vd.id = %a`
// sqlUpdateVDiffState has a penultimate placeholder for any additional columns you want to update, e.g. `, foo = 1`
sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = %s %s where id = %d"
// sqlUpdateVDiffState has a penultimate placeholder for any additional columns you want to update, e.g. `, foo = 1`.
// It also truncates the error if needed to ensure that we can save the state when the error text is very long.
sqlUpdateVDiffState = "update _vt.vdiff set state = %s, last_error = left(%s, 1024) %s where id = %d"
sqlUpdateVDiffStopped = `update _vt.vdiff as vd, _vt.vdiff_table as vdt set vd.state = 'stopped', vdt.state = 'stopped', vd.last_error = ''
where vd.id = vdt.vdiff_id and vd.id = %a and vd.state != 'completed'`
sqlGetVReplicationEntry = "select * from _vt.vreplication %s"
Expand Down
29 changes: 20 additions & 9 deletions go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,19 @@ import (
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/evalengine"
"vitess.io/vitess/go/vt/vttablet/tabletconn"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

// how long to wait for background operations to complete
Expand All @@ -72,6 +73,12 @@ type tableDiffer struct {
sourceQuery string
table *tabletmanagerdatapb.TableDefinition
lastPK *querypb.QueryResult

// wgShardStreamers is used, with a cancellable context, to wait for all shard streamers
// to finish after each diff is complete.
wgShardStreamers sync.WaitGroup
shardStreamsCtx context.Context
shardStreamsCancel context.CancelFunc
}

func newTableDiffer(wd *workflowDiffer, table *tabletmanagerdatapb.TableDefinition, sourceQuery string) *tableDiffer {
Expand Down Expand Up @@ -121,19 +128,21 @@ func (td *tableDiffer) initialize(ctx context.Context) error {
}
}()

td.shardStreamsCtx, td.shardStreamsCancel = context.WithCancel(ctx)

if err := td.selectTablets(ctx); err != nil {
return err
}
if err := td.syncSourceStreams(ctx); err != nil {
return err
}
if err := td.startSourceDataStreams(ctx); err != nil {
if err := td.startSourceDataStreams(td.shardStreamsCtx); err != nil {
return err
}
if err := td.syncTargetStreams(ctx); err != nil {
return err
}
if err := td.startTargetDataStream(ctx); err != nil {
if err := td.startTargetDataStream(td.shardStreamsCtx); err != nil {
return err
}
td.setupRowSorters()
Expand Down Expand Up @@ -203,7 +212,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error {
var (
wg sync.WaitGroup
sourceErr, targetErr error
targetTablet *topodata.Tablet
targetTablet *topodatapb.Tablet
)

// The cells from the vdiff record are a comma separated list.
Expand Down Expand Up @@ -254,7 +263,7 @@ func (td *tableDiffer) selectTablets(ctx context.Context) error {
return targetErr
}

func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodata.Tablet, error) {
func pickTablet(ctx context.Context, ts *topo.Server, cells []string, localCell, keyspace, shard, tabletTypes string) (*topodatapb.Tablet, error) {
tp, err := discovery.NewTabletPicker(ctx, ts, cells, localCell, keyspace, shard, tabletTypes, discovery.TabletPickerOptions{})
if err != nil {
return nil, err
Expand Down Expand Up @@ -354,10 +363,12 @@ func (td *tableDiffer) restartTargetVReplicationStreams(ctx context.Context) err

func (td *tableDiffer) streamOneShard(ctx context.Context, participant *shardStreamer, query string, lastPK *querypb.QueryResult, gtidch chan string) {
log.Infof("streamOneShard Start on %s using query: %s", participant.tablet.Alias.String(), query)
td.wgShardStreamers.Add(1)
defer func() {
log.Infof("streamOneShard End on %s", participant.tablet.Alias.String())
close(participant.result)
close(gtidch)
td.wgShardStreamers.Done()
}()
participant.err = func() error {
conn, err := tabletconn.GetDialer()(participant.tablet, false)
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ func (wd *workflowDiffer) reconcileExtraRows(dr *DiffReport, maxExtraRowsToCompa
}

func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.DBClient, td *tableDiffer) error {
defer func() {
if td.shardStreamsCancel != nil {
td.shardStreamsCancel()
}
// Wait for all the shard streams to finish before returning.
td.wgShardStreamers.Wait()
}()

select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
Expand Down

0 comments on commit 54f2daf

Please sign in to comment.