@@ -250,7 +250,7 @@ func (sm *StreamMigrator) LegacyStopStreams(ctx context.Context) ([]string, erro
250
250
return sm .legacyVerifyStreamPositions (ctx , positions )
251
251
}
252
252
253
- // StopStreams stops streams
253
+ // StopStreams stops streams.
254
254
func (sm * StreamMigrator ) StopStreams (ctx context.Context ) ([]string , error ) {
255
255
if sm .streams == nil {
256
256
return nil , nil
@@ -684,6 +684,26 @@ func (sm *StreamMigrator) stopSourceStreams(ctx context.Context) error {
684
684
return nil
685
685
}
686
686
687
+ // For materialize workflows where the source and target are both the keyspace
688
+ // that is being resharded, we need to wait for those to catchup as well.
689
+ // New writes have already been blocked on the source, but the materialization
690
+ // workflow(s) may still need to catchup with writes that happend just before
691
+ // writes were stopped on the source.
692
+ for _ , vrs := range tabletStreams {
693
+ if vrs .WorkflowType == binlogdatapb .VReplicationWorkflowType_Materialize && vrs .BinlogSource .Keyspace == sm .ts .TargetKeyspaceName () {
694
+ tablet := source .GetPrimary ().Tablet
695
+ pos , err := sm .ts .TabletManagerClient ().PrimaryPosition (ctx , tablet )
696
+ if err != nil {
697
+ return err
698
+ }
699
+ sm .ts .Logger ().Infof ("Waiting for Materialization workflow %s on %v/%v to reach position %v, starting from position %s" ,
700
+ vrs .Workflow , sm .ts .SourceKeyspaceName (), vrs .BinlogSource .Shard , pos , vrs .Position )
701
+ if err := sm .ts .TabletManagerClient ().VReplicationWaitForPos (ctx , tablet , vrs .ID , pos ); err != nil {
702
+ return err
703
+ }
704
+ }
705
+ }
706
+
687
707
query := fmt .Sprintf ("update _vt.vreplication set state='Stopped', message='for cutover' where id in %s" , VReplicationStreams (tabletStreams ).Values ())
688
708
_ , err := sm .ts .TabletManagerClient ().VReplicationExec (ctx , source .GetPrimary ().Tablet , query )
689
709
if err != nil {
@@ -925,6 +945,9 @@ func (sm *StreamMigrator) createTargetStreams(ctx context.Context, tmpl []*VRepl
925
945
// 1 to 1 in this scenario so we use the target shard's name and primary
926
946
// tablet's position for the source.
927
947
vrs .BinlogSource .Shard = target .GetShard ().ShardName ()
948
+ // TODO: the problem is that the materialize stream may still need GTIDs
949
+ // from the OLD shards at this point... so we could miss writes that
950
+ // occurred on the source table(s) just before the switch.
928
951
vrs .Position , err = binlogplayer .DecodePosition (target .Position )
929
952
if err != nil {
930
953
return err
0 commit comments