diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication.go b/go/vt/vttablet/tabletmanager/rpc_vreplication.go index e16968fc5b9..2ae98855290 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication.go @@ -18,6 +18,7 @@ package tabletmanager import ( "context" + "fmt" "strings" "google.golang.org/protobuf/encoding/prototext" @@ -49,6 +50,8 @@ const ( sqlSelectVReplicationWorkflowConfig = "select id, source, cell, tablet_types, state, message from %s.vreplication where workflow = %a" // Update the configuration values for a workflow's vreplication stream. sqlUpdateVReplicationWorkflowStreamConfig = "update %s.vreplication set state = %a, source = %a, cell = %a, tablet_types = %a where id = %a" + // Check if workflow is still copying. + sqlGetVReplicationCopyStatus = "select distinct vrepl_id from %s.copy_state where vrepl_id = %d" ) func (tm *TabletManager) CreateVReplicationWorkflow(ctx context.Context, req *tabletmanagerdatapb.CreateVReplicationWorkflowRequest) (*tabletmanagerdatapb.CreateVReplicationWorkflowResponse, error) { @@ -227,6 +230,18 @@ func (tm *TabletManager) ReadVReplicationWorkflow(ctx context.Context, req *tabl return resp, nil } +func isStreamCopying(tm *TabletManager, id int64) (bool, error) { + query := fmt.Sprintf(sqlGetVReplicationCopyStatus, sidecar.GetIdentifier(), id) + res, err := tm.VREngine.Exec(query) + if err != nil { + return false, err + } + if res != nil && len(res.Rows) > 0 { + return true, nil + } + return false, nil +} + // UpdateVReplicationWorkflow updates the sidecar databases's vreplication // record(s) for this tablet's vreplication workflow stream(s). If there // are no streams for the given workflow on the tablet then a nil result @@ -302,6 +317,17 @@ func (tm *TabletManager) UpdateVReplicationWorkflow(ctx context.Context, req *ta if !textutil.ValueIsSimulatedNull(req.State) { state = binlogdatapb.VReplicationWorkflowState_name[int32(req.State)] } + if state == binlogdatapb.VReplicationWorkflowState_Running.String() { + // `Workflow Start` sets the new state to Running. However, if stream is still copying tables, we should set + // the state as Copying. + isCopying, err := isStreamCopying(tm, id) + if err != nil { + return nil, err + } + if isCopying { + state = binlogdatapb.VReplicationWorkflowState_Copying.String() + } + } bindVars = map[string]*querypb.BindVariable{ "st": sqltypes.StringBindVariable(state), "sc": sqltypes.StringBindVariable(string(source)), diff --git a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go index ef2ac591c2b..0f403b0d958 100644 --- a/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_vreplication_test.go @@ -489,10 +489,19 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { fmt.Sprintf("%d", vreplID), ) + getCopyStateQuery := fmt.Sprintf(sqlGetVReplicationCopyStatus, sidecar.GetIdentifier(), int64(vreplID)) + copyStatusFields := sqltypes.MakeTestFields( + "id", + "int64", + ) + notCopying := sqltypes.MakeTestResult(copyStatusFields) + copying := sqltypes.MakeTestResult(copyStatusFields, "1") + tests := []struct { - name string - request *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest - query string + name string + request *tabletmanagerdatapb.UpdateVReplicationWorkflowRequest + query string + isCopying bool }{ { name: "update cells", @@ -572,6 +581,19 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { query: fmt.Sprintf(`update _vt.vreplication set state = '%s', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`, binlogdatapb.VReplicationWorkflowState_Stopped.String(), keyspace, shard, cells[0], tabletTypes[0], vreplID), }, + { + name: "update to running while copying", + request: &tabletmanagerdatapb.UpdateVReplicationWorkflowRequest{ + Workflow: workflow, + State: binlogdatapb.VReplicationWorkflowState_Running, + Cells: textutil.SimulatedNullStringSlice, + TabletTypes: []topodatapb.TabletType{topodatapb.TabletType(textutil.SimulatedNullInt)}, + OnDdl: binlogdatapb.OnDDLAction(textutil.SimulatedNullInt), + }, + isCopying: true, + query: fmt.Sprintf(`update _vt.vreplication set state = 'Copying', source = 'keyspace:\"%s\" shard:\"%s\" filter:{rules:{match:\"customer\" filter:\"select * from customer\"} rules:{match:\"corder\" filter:\"select * from corder\"}}', cell = '%s', tablet_types = '%s' where id in (%d)`, + keyspace, shard, cells[0], tabletTypes[0], vreplID), + }, } for _, tt := range tests { @@ -590,6 +612,17 @@ func TestUpdateVReplicationWorkflow(t *testing.T) { // These are the same for each RPC call. tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.DefaultName), &sqltypes.Result{}, nil) tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(selectQuery, selectRes, nil) + + if tt.request.State == binlogdatapb.VReplicationWorkflowState_Running || + tt.request.State == binlogdatapb.VReplicationWorkflowState(textutil.SimulatedNullInt) { + tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.GetIdentifier()), &sqltypes.Result{}, nil) + if tt.isCopying { + tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(getCopyStateQuery, copying, nil) + } else { + tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(getCopyStateQuery, notCopying, nil) + + } + } tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(fmt.Sprintf("use %s", sidecar.DefaultName), &sqltypes.Result{}, nil) tenv.tmc.tablets[tabletUID].vrdbClient.ExpectRequest(idQuery, idRes, nil)