Skip to content

Commit

Permalink
Update source positions once we know all writes are done.
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Aug 9, 2024
1 parent bf0c5f8 commit 6e882dc
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 0 deletions.
7 changes: 7 additions & 0 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3498,6 +3498,13 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
}
}

// Ensure that the source positions are correct now that writes are stopped, the streams were
// stopped (e.g. intra-keyspace materializations that write on the source), and any in progress
// writes are done.
if err := ts.updateSourcePositions(ctx); err != nil {
return handleError("failed to update source replication positions", err)
}

if err := confirmKeyspaceLocksHeld(); err != nil {
return handleError("locks were lost", err)
}
Expand Down
11 changes: 11 additions & 0 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,17 @@ func (ts *trafficSwitcher) gatherPositions(ctx context.Context) error {
})
}

// updateSourcePositions will update the Position for all migration sources.
func (ts *trafficSwitcher) updateSourcePositions(ctx context.Context) error {
err := ts.ForAllSources(func(source *MigrationSource) error {
var err error
source.Position, err = ts.ws.tmc.PrimaryPosition(ctx, source.GetPrimary().Tablet)
ts.Logger().Infof("Updated position for source %v:%v: %v", ts.SourceKeyspaceName(), source.GetShard().ShardName(), source.Position)
return err
})
return err
}

func (ts *trafficSwitcher) isSequenceParticipating(ctx context.Context) (bool, error) {
vschema, err := ts.TopoServer().GetVSchema(ctx, ts.targetKeyspace)
if err != nil {
Expand Down

0 comments on commit 6e882dc

Please sign in to comment.