@@ -428,8 +428,12 @@ func (s *Server) GetWorkflowState(ctx context.Context, targetKeyspace, workflowN
428
428
return s .getWorkflowState (ctx , targetKeyspace , workflowName )
429
429
}
430
430
431
- func (s * Server ) getWorkflowState (ctx context.Context , targetKeyspace , workflowName string ) (* trafficSwitcher , * State , error ) {
432
- ts , err := s .buildTrafficSwitcher (ctx , targetKeyspace , workflowName )
431
+ func (s * Server ) getWorkflowState (ctx context.Context , targetKeyspace , workflowName string , opts ... WorkflowOption ) (* trafficSwitcher , * State , error ) {
432
+ var options workflowOptions
433
+ for _ , o := range opts {
434
+ o .apply (& options )
435
+ }
436
+ ts , err := s .buildTrafficSwitcher (ctx , targetKeyspace , workflowName , opts ... )
433
437
if err != nil {
434
438
s .Logger ().Errorf ("buildTrafficSwitcher failed: %v" , err )
435
439
return nil , nil , err
@@ -442,6 +446,13 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN
442
446
IsPartialMigration : ts .isPartialMigration ,
443
447
}
444
448
449
+ if ts .workflowType == binlogdatapb .VReplicationWorkflowType_MoveTables && options .ignoreSourceKeyspace {
450
+ if err := s .updateTablesTrafficState (ctx , state , ts .tables ); err != nil {
451
+ return nil , nil , err
452
+ }
453
+ return ts , state , nil
454
+ }
455
+
445
456
if ts .workflowType == binlogdatapb .VReplicationWorkflowType_CreateLookupIndex {
446
457
// Nothing left to do.
447
458
return ts , state , nil
@@ -471,7 +482,6 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN
471
482
if len (ts .Tables ()) == 0 {
472
483
return nil , nil , vterrors .Errorf (vtrpcpb .Code_FAILED_PRECONDITION , "no tables in workflow %s.%s" , targetKeyspace , workflowName )
473
484
}
474
- table := ts .Tables ()[0 ]
475
485
476
486
if ts .IsMultiTenantMigration () {
477
487
// Deduce which traffic has been switched by looking at the current keyspace routing rules.
@@ -497,29 +507,9 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN
497
507
}
498
508
}
499
509
} else {
500
- state .RdonlyCellsSwitched , state .RdonlyCellsNotSwitched , err = s .GetCellsWithTableReadsSwitched (ctx , sourceKeyspace , targetKeyspace , table , topodatapb .TabletType_RDONLY )
501
- if err != nil {
502
- return nil , nil , err
503
- }
504
-
505
- state .ReplicaCellsSwitched , state .ReplicaCellsNotSwitched , err = s .GetCellsWithTableReadsSwitched (ctx , sourceKeyspace , targetKeyspace , table , topodatapb .TabletType_REPLICA )
506
- if err != nil {
510
+ if err := s .updateTablesTrafficState (ctx , state , ts .tables ); err != nil {
507
511
return nil , nil , err
508
512
}
509
- globalRules , err := topotools .GetRoutingRules (ctx , ts .TopoServer ())
510
- if err != nil {
511
- return nil , nil , err
512
- }
513
- for _ , table := range ts .Tables () {
514
- // If a rule for the primary tablet type exists for any table and points to the target keyspace,
515
- // then writes have been switched.
516
- ruleKey := fmt .Sprintf ("%s.%s" , sourceKeyspace , table )
517
- rr := globalRules [ruleKey ]
518
- if len (rr ) > 0 && rr [0 ] != ruleKey {
519
- state .WritesSwitched = true
520
- break
521
- }
522
- }
523
513
}
524
514
} else {
525
515
state .WorkflowType = TypeReshard
@@ -1076,7 +1066,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
1076
1066
err = vterrors .Wrapf (err , "failed to cleanup denied table entries: %v" , cerr )
1077
1067
}
1078
1068
}
1079
- if cerr := s .dropArtifacts (ctx , false , false , & switcher {s : s , ts : ts }); cerr != nil {
1069
+ if cerr := s .dropArtifacts (ctx , false , & switcher {s : s , ts : ts }); cerr != nil {
1080
1070
err = vterrors .Wrapf (err , "failed to cleanup workflow artifacts: %v" , cerr )
1081
1071
}
1082
1072
if origVSchema == nil { // There's no previous version to restore
@@ -1249,7 +1239,11 @@ func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTa
1249
1239
span , ctx := trace .NewSpan (ctx , "workflow.Server.MoveTablesComplete" )
1250
1240
defer span .Finish ()
1251
1241
1252
- ts , state , err := s .getWorkflowState (ctx , req .GetTargetKeyspace (), req .GetWorkflow ())
1242
+ opts := []WorkflowOption {}
1243
+ if req .IgnoreSourceKeyspace {
1244
+ opts = append (opts , IgnoreSourceKeyspace ())
1245
+ }
1246
+ ts , state , err := s .getWorkflowState (ctx , req .GetTargetKeyspace (), req .GetWorkflow (), opts ... )
1253
1247
if err != nil {
1254
1248
return nil , err
1255
1249
}
@@ -1291,7 +1285,7 @@ func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTa
1291
1285
}
1292
1286
1293
1287
if req .IgnoreSourceKeyspace {
1294
- if err := s .dropArtifacts (ctx , req .KeepRoutingRules , true , & switcher {s : s , ts : ts }); err != nil {
1288
+ if err := s .dropArtifacts (ctx , req .KeepRoutingRules , & switcher {s : s , ts : ts }, opts ... ); err != nil {
1295
1289
return nil , vterrors .Wrapf (err , "failed to cleanup workflow artifacts" )
1296
1290
}
1297
1291
if err := ts .TopoServer ().RebuildSrvVSchema (ctx , nil ); err != nil {
@@ -2053,7 +2047,11 @@ func (s *Server) deleteTenantData(ctx context.Context, ts *trafficSwitcher, batc
2053
2047
})
2054
2048
}
2055
2049
2056
- func (s * Server ) buildTrafficSwitcher (ctx context.Context , targetKeyspace , workflowName string ) (* trafficSwitcher , error ) {
2050
+ func (s * Server ) buildTrafficSwitcher (ctx context.Context , targetKeyspace , workflowName string , opts ... WorkflowOption ) (* trafficSwitcher , error ) {
2051
+ var options workflowOptions
2052
+ for _ , o := range opts {
2053
+ o .apply (& options )
2054
+ }
2057
2055
tgtInfo , err := BuildTargets (ctx , s .ts , s .tmc , targetKeyspace , workflowName )
2058
2056
if err != nil {
2059
2057
s .Logger ().Infof ("Error building targets: %s" , err )
@@ -2120,6 +2118,9 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf
2120
2118
}
2121
2119
}
2122
2120
2121
+ if options .ignoreSourceKeyspace {
2122
+ continue
2123
+ }
2123
2124
if _ , ok := ts .sources [bls .Shard ]; ok {
2124
2125
continue
2125
2126
}
@@ -2152,6 +2153,10 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf
2152
2153
}
2153
2154
}
2154
2155
}
2156
+ if ts .workflowType == binlogdatapb .VReplicationWorkflowType_MoveTables && options .ignoreSourceKeyspace {
2157
+ log .Errorf ("DEBUG: Ignoring source keyspace for MoveTables workflow with source Keyspace %s" , ts .sourceKeyspace )
2158
+ return ts , nil
2159
+ }
2155
2160
vs , err := sourceTopo .GetVSchema (ctx , ts .sourceKeyspace )
2156
2161
if err != nil {
2157
2162
return nil , err
@@ -2247,7 +2252,7 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy
2247
2252
}
2248
2253
}
2249
2254
}
2250
- if err := s .dropArtifacts (ctx , keepRoutingRules , false , sw ); err != nil {
2255
+ if err := s .dropArtifacts (ctx , keepRoutingRules , sw ); err != nil {
2251
2256
return nil , err
2252
2257
}
2253
2258
if err := ts .TopoServer ().RebuildSrvVSchema (ctx , nil ); err != nil {
@@ -2257,8 +2262,12 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy
2257
2262
return sw .logs (), nil
2258
2263
}
2259
2264
2260
- func (s * Server ) dropArtifacts (ctx context.Context , keepRoutingRules , ignoreSourceKeyspace bool , sw iswitcher ) error {
2261
- if ! ignoreSourceKeyspace {
2265
+ func (s * Server ) dropArtifacts (ctx context.Context , keepRoutingRules bool , sw iswitcher , opts ... WorkflowOption ) error {
2266
+ var options workflowOptions
2267
+ for _ , o := range opts {
2268
+ o .apply (& options )
2269
+ }
2270
+ if ! options .ignoreSourceKeyspace {
2262
2271
if err := sw .dropSourceReverseVReplicationStreams (ctx ); err != nil {
2263
2272
return err
2264
2273
}
@@ -3456,6 +3465,39 @@ func (s *Server) validateShardsHaveVReplicationPermissions(ctx context.Context,
3456
3465
return nil
3457
3466
}
3458
3467
3468
+ func (s * Server ) updateTablesTrafficState (ctx context.Context , state * State , tables []string ) error {
3469
+ // We assume a consistent state, so only choose routing rule for one table.
3470
+ if len (tables ) == 0 {
3471
+ return vterrors .Errorf (vtrpcpb .Code_FAILED_PRECONDITION , "no tables in workflow %s.%s" , state .TargetKeyspace , state .Workflow )
3472
+ }
3473
+
3474
+ var err error
3475
+ state .RdonlyCellsSwitched , state .RdonlyCellsNotSwitched , err = s .GetCellsWithTableReadsSwitched (ctx , state .SourceKeyspace , state .TargetKeyspace , tables [0 ], topodatapb .TabletType_RDONLY )
3476
+ if err != nil {
3477
+ return err
3478
+ }
3479
+ state .ReplicaCellsSwitched , state .ReplicaCellsNotSwitched , err = s .GetCellsWithTableReadsSwitched (ctx , state .SourceKeyspace , state .TargetKeyspace , tables [0 ], topodatapb .TabletType_REPLICA )
3480
+ if err != nil {
3481
+ return err
3482
+ }
3483
+ globalRules , err := topotools .GetRoutingRules (ctx , s .ts )
3484
+ if err != nil {
3485
+ return err
3486
+ }
3487
+ for _ , table := range tables {
3488
+ // If a rule for the primary tablet type exists for any table and points to the target keyspace,
3489
+ // then writes have been switched.
3490
+ ruleKey := fmt .Sprintf ("%s.%s" , state .SourceKeyspace , table )
3491
+ rr := globalRules [ruleKey ]
3492
+ if len (rr ) > 0 && rr [0 ] != ruleKey {
3493
+ state .WritesSwitched = true
3494
+ break
3495
+ }
3496
+ }
3497
+ log .Errorf ("DEBUG: state: %+v" , state )
3498
+ return err
3499
+ }
3500
+
3459
3501
func (s * Server ) Logger () logutil.Logger {
3460
3502
if s .options .logger == nil {
3461
3503
s .options .logger = logutil .NewConsoleLogger () // Use default system logger
0 commit comments