Skip to content

Commit 9104739

Browse files
[release-20.0-rc] VReplication Workflow: set state correctly when restarting workflow streams in the copy phase (#16217) (#16223)
Signed-off-by: Rohit Nayak <rohit@planetscale.com> Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com> Co-authored-by: Rohit Nayak <rohit@planetscale.com>
1 parent ff04f95 commit 9104739

File tree

2 files changed

+64
-6
lines changed

2 files changed

+64
-6
lines changed

go/vt/vttablet/tabletmanager/rpc_vreplication.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ const (
5959
// Update field values for multiple workflows. The final format specifier is
6060
// used to optionally add any additional predicates to the query.
6161
sqlUpdateVReplicationWorkflows = "update /*vt+ ALLOW_UNSAFE_VREPLICATION_WRITE */ %s.vreplication set%s where db_name = '%s'%s"
62+
// Check if workflow is still copying.
63+
sqlGetVReplicationCopyStatus = "select distinct vrepl_id from %s.copy_state where vrepl_id = %d"
6264
)
6365

6466
var (
@@ -379,6 +381,18 @@ func (tm *TabletManager) ReadVReplicationWorkflow(ctx context.Context, req *tabl
379381
return resp, nil
380382
}
381383

384+
func isStreamCopying(tm *TabletManager, id int64) (bool, error) {
385+
query := fmt.Sprintf(sqlGetVReplicationCopyStatus, sidecar.GetIdentifier(), id)
386+
res, err := tm.VREngine.Exec(query)
387+
if err != nil {
388+
return false, err
389+
}
390+
if res != nil && len(res.Rows) > 0 {
391+
return true, nil
392+
}
393+
return false, nil
394+
}
395+
382396
// UpdateVReplicationWorkflow updates the sidecar databases's vreplication
383397
// record(s) for this tablet's vreplication workflow stream(s). If there
384398
// are no streams for the given workflow on the tablet then a nil result
@@ -457,6 +471,17 @@ func (tm *TabletManager) UpdateVReplicationWorkflow(ctx context.Context, req *ta
457471
if !textutil.ValueIsSimulatedNull(req.State) {
458472
state = binlogdatapb.VReplicationWorkflowState_name[int32(req.State)]
459473
}
474+
if state == binlogdatapb.VReplicationWorkflowState_Running.String() {
475+
// `Workflow Start` sets the new state to Running. However, if stream is still copying tables, we should set
476+
// the state as Copying.
477+
isCopying, err := isStreamCopying(tm, id)
478+
if err != nil {
479+
return nil, err
480+
}
481+
if isCopying {
482+
state = binlogdatapb.VReplicationWorkflowState_Copying.String()
483+
}
484+
}
460485
bindVars = map[string]*querypb.BindVariable{
461486
"st": sqltypes.StringBindVariable(state),
462487
"sc": sqltypes.StringBindVariable(string(source)),

go/vt/vttablet/tabletmanager/rpc_vreplication_test.go

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,8 @@ func TestMoveTables(t *testing.T) {
390390

391391
for _, ftc := range targetShards {
392392
addInvariants(ftc.vrdbClient, vreplID, sourceTabletUID, position, wf, tenv.cells[0])
393-
393+
getCopyStateQuery := fmt.Sprintf(sqlGetVReplicationCopyStatus, sidecar.GetIdentifier(), vreplID)
394+
ftc.vrdbClient.AddInvariant(getCopyStateQuery, &sqltypes.Result{})
394395
tenv.tmc.setVReplicationExecResults(ftc.tablet, getCopyState, &sqltypes.Result{})
395396
ftc.vrdbClient.ExpectRequest(fmt.Sprintf(readAllWorkflows, tenv.dbName, ""), &sqltypes.Result{}, nil)
396397
insert := fmt.Sprintf(`%s values ('%s', 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"t1\" filter:\"select * from t1 where in_keyrange(id, \'%s.hash\', \'%s\')\"}}', '', 0, 0, '%s', 'primary,replica,rdonly', now(), 0, 'Stopped', '%s', %d, 0, 0, '{}')`,
@@ -574,6 +575,7 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
574575
),
575576
fmt.Sprintf("%d|%s|%s|%s|Running|", vreplID, blsStr, cells[0], tabletTypes[0]),
576577
)
578+
577579
idQuery, err := sqlparser.ParseAndBind("select id from _vt.vreplication where id = %a",
578580
sqltypes.Int64BindVariable(int64(vreplID)))
579581
require.NoError(t, err)
@@ -585,10 +587,19 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
585587
fmt.Sprintf("%d", vreplID),
586588
)
587589

590+
getCopyStateQuery := fmt.Sprintf(sqlGetVReplicationCopyStatus, sidecar.GetIdentifier(), int64(vreplID))
591+
copyStatusFields := sqltypes.MakeTestFields(
592+
"id",
593+
"int64",
594+
)
595+
notCopying := sqltypes.MakeTestResult(copyStatusFields)
596+
copying := sqltypes.MakeTestResult(copyStatusFields, "1")
597+
588598
tests := []struct {
589-
name string
590-
request *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest
591-
query string
599+
name string
600+
request *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest
601+
query string
602+
isCopying bool
592603
}{
593604
{
594605
name: "update cells",
@@ -668,6 +679,19 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
668679
query: fmt.Sprintf(`update _vt.vreplication set state = '%s', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"corder\" filter:\"select * from corder\"} rules:{match:\"customer\" filter:\"select * from customer\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
669680
binlogdatapb.VReplicationWorkflowState_Stopped.String(), keyspace, shard, cells[0], tabletTypes[0], vreplID),
670681
},
682+
{
683+
name: "update to running while copying",
684+
request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{
685+
Workflow: workflow,
686+
State: binlogdatapb.VReplicationWorkflowState_Running,
687+
Cells: textutil.SimulatedNullStringSlice,
688+
TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)},
689+
OnDdl: binlogdatapb.OnDDLAction(textutil.SimulatedNullInt),
690+
},
691+
isCopying: true,
692+
query: fmt.Sprintf(`update _vt.vreplication set state = 'Copying', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"corder\" filter:\"select * from corder\"} rules:{match:\"customer\" filter:\"select * from customer\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`,
693+
keyspace, shard, cells[0], tabletTypes[0], vreplID),
694+
},
671695
}
672696

673697
for _, tt := range tests {
@@ -686,12 +710,21 @@ func TestUpdateVReplicationWorkflow(t *testing.T) {
686710
// These are the same for each RPC call.
687711
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.GetIdentifier()), &sqltypes.Result{}, nil)
688712
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(selectQuery, selectRes, nil)
689-
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.GetIdentifier()), &sqltypes.Result{}, nil)
690-
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(idQuery, idRes, nil)
713+
if tt.request.State == binlogdatapb.VReplicationWorkflowState_Running ||
714+
tt.request.State == binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt) {
715+
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.GetIdentifier()), &sqltypes.Result{}, nil)
716+
if tt.isCopying {
717+
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(getCopyStateQuery, copying, nil)
718+
} else {
719+
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(getCopyStateQuery, notCopying, nil)
691720

721+
}
722+
}
692723
// This is our expected query, which will also short circuit
693724
// the test with an error as at this point we've tested what
694725
// we wanted to test.
726+
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.GetIdentifier()), &sqltypes.Result{}, nil)
727+
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(idQuery, idRes, nil)
695728
tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(tt.query, &sqltypes.Result{RowsAffected: 1}, errShortCircuit)
696729
_, err = tenv.tmc.tablets[tabletUID].tm.UpdateVReplicationWorkflow(ctx, tt.request)
697730
tenv.tmc.tablets[tabletUID].vrdbClient.Wait()

0 commit comments

Comments
 (0)