Skip to content

Commit

Permalink
More refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Aug 21, 2024
1 parent 93a7d82 commit 8ca8200
Showing 1 changed file with 86 additions and 1 deletion.
87 changes: 86 additions & 1 deletion go/vt/vtctl/workflow/stream_migrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,22 @@ func (env *streamMigratorEnv) close() {
env.tenv.close()
}

func (env *streamMigratorEnv) addSourceQueries(queries []string) {
for _, id := range env.sourceTabletIds {
for _, q := range queries {
env.tenv.tmc.expectVRQuery(id, q, &sqltypes.Result{})
}
}
}

func (env *streamMigratorEnv) addTargetQueries(queries []string) {
for _, id := range env.targetTabletIds {
for _, q := range queries {
env.tenv.tmc.expectVRQuery(id, q, &sqltypes.Result{})
}
}
}

func newStreamMigratorEnv(ctx context.Context, t *testing.T, sourceKeyspace, targetKeyspace *testKeyspace) *streamMigratorEnv {
tenv := newTestEnv(t, ctx, "cell1", sourceKeyspace, targetKeyspace)
env := &streamMigratorEnv{tenv: tenv}
Expand Down Expand Up @@ -484,6 +500,50 @@ func addMaterializeWorkflow(t *testing.T, env *streamMigratorEnv, id int32, sour
for _, resp := range workflowResponses {
env.tenv.tmc.AddVReplicationWorkflowsResponse(workflowKey, resp)
}
queries := []string{
fmt.Sprintf("select distinct vrepl_id from _vt.copy_state where vrepl_id in (%d)", id),
fmt.Sprintf("update _vt.vreplication set state='Stopped', message='for cutover' where id in (%d)", id),
fmt.Sprintf("delete from _vt.vreplication where db_name='vt_%s' and workflow in ('%s')",
env.tenv.sourceKeyspace.KeyspaceName, wfName),
}
env.addSourceQueries(queries)
queries = []string{
fmt.Sprintf("delete from _vt.vreplication where db_name='vt_%s' and workflow in ('%s')",
env.tenv.sourceKeyspace.KeyspaceName, wfName),
}
env.addTargetQueries(queries)

}

func addReferenceWorkflow(t *testing.T, env *streamMigratorEnv, id int32, sourceShard string) {
var wfs tabletmanagerdata.ReadVReplicationWorkflowsResponse
wfName := "wfRef1"
wfs.Workflows = append(wfs.Workflows, &tabletmanagerdata.ReadVReplicationWorkflowResponse{
Workflow: wfName,
WorkflowType: binlogdatapb.VReplicationWorkflowType_Materialize,
})
wfs.Workflows[0].Streams = append(wfs.Workflows[0].Streams, &tabletmanagerdata.ReadVReplicationWorkflowResponse_Stream{
Id: id,
Bls: &binlogdatapb.BinlogSource{
Keyspace: env.tenv.sourceKeyspace.KeyspaceName,
Shard: sourceShard,
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
{Match: "ref", Filter: "select * from ref"},
},
},
},
Pos: position,
State: binlogdatapb.VReplicationWorkflowState_Running,
})
workflowKey := env.tenv.tmc.GetWorkflowKey(env.tenv.sourceKeyspace.KeyspaceName, sourceShard)
workflowResponses := []*tabletmanagerdata.ReadVReplicationWorkflowsResponse{
nil, // this is the response for getting stopped workflows
&wfs, &wfs, &wfs, // return the full list for subsequent GetWorkflows calls
}
for _, resp := range workflowResponses {
env.tenv.tmc.AddVReplicationWorkflowsResponse(workflowKey, resp)
}
for _, id := range env.sourceTabletIds {
queries := []string{
fmt.Sprintf("select distinct vrepl_id from _vt.copy_state where vrepl_id in (%d)", id),
Expand All @@ -496,7 +556,7 @@ func addMaterializeWorkflow(t *testing.T, env *streamMigratorEnv, id int32, sour
for _, id := range env.targetTabletIds {
queries := []string{
fmt.Sprintf("delete from _vt.vreplication where db_name='vt_%s' and workflow in ('%s')",
env.tenv.sourceKeyspace.KeyspaceName, wfName),
env.tenv.targetKeyspace.KeyspaceName, wfName),
}
for _, q := range queries {
env.tenv.tmc.expectVRQuery(id, q, &sqltypes.Result{})
Expand Down Expand Up @@ -534,6 +594,11 @@ func TestBuildStreamMigratorOneMaterialize(t *testing.T) {
require.Equal(t, 1, len(workflows))
require.NoError(t, sm.MigrateStreams(ctx))
require.Len(t, sm.templates, 1)
env.addTargetQueries([]string{
fmt.Sprintf("update _vt.vreplication set state='Running' where db_name='vt_%s' and workflow in ('%s')",
env.tenv.sourceKeyspace.KeyspaceName, "wfMat1"),
})
require.NoError(t, StreamMigratorFinalize(ctx, env.ts, []string{"wfMat1"}))
}

func TestBuildStreamMigratorNoStreams(t *testing.T) {
Expand All @@ -553,3 +618,23 @@ func TestBuildStreamMigratorNoStreams(t *testing.T) {
require.NoError(t, sm.MigrateStreams(ctx))
require.Len(t, sm.templates, 0)
}

func TestBuildStreamMigratorRefStream(t *testing.T) {
ctx := context.Background()
env := newStreamMigratorEnv(ctx, t, customerUnshardedKeyspace, customerShardedKeyspace)
defer env.close()

addReferenceWorkflow(t, env, 100, "0")

sm, err := BuildStreamMigrator(ctx, env.ts, false, sqlparser.NewTestParser())
require.NoError(t, err)
require.NotNil(t, sm)
require.NotNil(t, sm.streams)
require.Equal(t, 0, len(sm.streams))

workflows, err := sm.StopStreams(ctx)
require.NoError(t, err)
require.Equal(t, 0, len(workflows))
require.NoError(t, sm.MigrateStreams(ctx))
require.Len(t, sm.templates, 0)
}

0 comments on commit 8ca8200

Please sign in to comment.