Skip to content

Commit

Permalink
Call dropSources with the workflow action opts
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Feb 13, 2025
1 parent 14b0032 commit 8b74a2a
Showing 1 changed file with 30 additions and 29 deletions.
59 changes: 30 additions & 29 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1294,23 +1294,14 @@ func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTa
return nil, ErrWorkflowCompleteNotFullySwitched
}

if req.IgnoreSourceKeyspace {
if err := s.dropArtifacts(ctx, req.KeepRoutingRules, &switcher{s: s, ts: ts}, opts...); err != nil {
return nil, vterrors.Wrapf(err, "failed to cleanup workflow artifacts")
}
if err := ts.TopoServer().RebuildSrvVSchema(ctx, nil); err != nil {
return nil, err
}
var renameTable TableRemovalType
if req.RenameTables {
renameTable = RenameTable
} else {
var renameTable TableRemovalType
if req.RenameTables {
renameTable = RenameTable
} else {
renameTable = DropTable
}
if dryRunResults, err = s.dropSources(ctx, ts, renameTable, req.KeepData, req.KeepRoutingRules, false, req.DryRun); err != nil {
return nil, err
}
renameTable = DropTable
}
if dryRunResults, err = s.dropSources(ctx, ts, renameTable, req.KeepData, req.KeepRoutingRules, false, req.DryRun, opts...); err != nil {
return nil, err
}

resp := &vtctldatapb.MoveTablesCompleteResponse{
Expand Down Expand Up @@ -2228,30 +2219,40 @@ func (s *Server) dropRelatedArtifacts(ctx context.Context, keepRoutingRules bool

// dropSources cleans up source tables, shards and denied tables after a
// MoveTables/Reshard is completed.
func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalType TableRemovalType, keepData, keepRoutingRules, force, dryRun bool) (*[]string, error) {
func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalType TableRemovalType, keepData, keepRoutingRules, force, dryRun bool, opts ...WorkflowActionOption) (*[]string, error) {
wopts := processWorkflowActionOptions(opts)
var (
sw iswitcher
err error
sw iswitcher
err, lockErr error
sourceUnlock, targetUnlock func(*error)
)
if dryRun {
sw = &switcherDryRun{ts: ts, drLog: NewLogRecorder()}
} else {
sw = &switcher{ts: ts, s: s}
}

// Lock the source and target keyspaces.
ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropSources")
if lockErr != nil {
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr)
}
defer sourceUnlock(&err)
if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() {
lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropSources")
if wopts.ignoreSourceKeyspace {
// Lock only the target keyspace.
ctx, targetUnlock, lockErr = sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropSources")
if lockErr != nil {
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr)
}
defer targetUnlock(&err)
ctx = lockCtx
} else {
// Lock the source and target keyspaces.
ctx, sourceUnlock, lockErr = sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropSources")
if lockErr != nil {
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr)
}
defer sourceUnlock(&err)
if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() {
ctx, targetUnlock, lockErr = sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropSources")
if lockErr != nil {
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr)
}
defer targetUnlock(&err)
}
}

if !force {
Expand All @@ -2260,7 +2261,7 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy
return nil, err
}
}
if !keepData {
if !keepData && !wopts.ignoreSourceKeyspace {
switch ts.MigrationType() {
case binlogdatapb.MigrationType_TABLES:
s.Logger().Infof("Deleting tables")
Expand Down

0 comments on commit 8b74a2a

Please sign in to comment.