From 80454e7c6ac74c6ea53c35464015ad1575020611 Mon Sep 17 00:00:00 2001 From: Noble Mittal Date: Mon, 6 Jan 2025 23:05:00 +0530 Subject: [PATCH] Move VDiff related workflow server APIs to vdiff.go and add unit tests Signed-off-by: Noble Mittal --- go/vt/vtctl/workflow/server.go | 563 ++++++++++++++-------------- go/vt/vtctl/workflow/server_test.go | 502 ++++++++++++------------- go/vt/vtctl/workflow/vdiff.go | 291 ++++++++++++++ go/vt/vtctl/workflow/vdiff_test.go | 355 ++++++++++++++++++ 4 files changed, 1176 insertions(+), 535 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 8123416eb41..b02c851ef39 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -20,7 +20,6 @@ import ( "context" "errors" "fmt" - "math" "slices" "sort" "strings" @@ -28,7 +27,6 @@ import ( "text/template" "time" - "github.com/google/uuid" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "google.golang.org/grpc/codes" @@ -41,7 +39,6 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/concurrency" - "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" @@ -1249,286 +1246,286 @@ func (s *Server) ReshardCreate(ctx context.Context, req *vtctldatapb.ReshardCrea }) } -// VDiffCreate is part of the vtctlservicepb.VtctldServer interface. -// It passes on the request to the target primary tablets that are -// participating in the given workflow and VDiff. -func (s *Server) VDiffCreate(ctx context.Context, req *vtctldatapb.VDiffCreateRequest) (*vtctldatapb.VDiffCreateResponse, error) { - span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffCreate") - defer span.Finish() - - span.Annotate("keyspace", req.TargetKeyspace) - span.Annotate("workflow", req.Workflow) - span.Annotate("uuid", req.Uuid) - span.Annotate("source_cells", req.SourceCells) - span.Annotate("target_cells", req.TargetCells) - span.Annotate("tablet_types", req.TabletTypes) - span.Annotate("tables", req.Tables) - span.Annotate("auto_retry", req.AutoRetry) - span.Annotate("max_diff_duration", req.MaxDiffDuration) - if req.AutoStart != nil { - span.Annotate("auto_start", req.GetAutoStart()) - } - - var err error - req.Uuid = strings.TrimSpace(req.Uuid) - if req.Uuid == "" { // Generate a UUID - req.Uuid = uuid.New().String() - } else { // Validate UUID if provided - if err = uuid.Validate(req.Uuid); err != nil { - return nil, vterrors.Wrapf(err, "invalid UUID provided: %s", req.Uuid) - } - } - - tabletTypesStr := discovery.BuildTabletTypesString(req.TabletTypes, req.TabletSelectionPreference) - - if req.Limit == 0 { // This would produce no useful results - req.Limit = math.MaxInt64 - } - // This is a pointer so there's no ZeroValue in the message - // and an older v18 client will not provide it. - if req.MaxDiffDuration == nil { - req.MaxDiffDuration = &vttimepb.Duration{} - } - // The other vttime.Duration vars should not be nil as the - // client should always provide them, but we check anyway to - // be safe. - if req.FilteredReplicationWaitTime == nil { - // A value of 0 is not valid as the vdiff will never succeed. - req.FilteredReplicationWaitTime = &vttimepb.Duration{ - Seconds: int64(DefaultTimeout.Seconds()), - } - } - if req.WaitUpdateInterval == nil { - req.WaitUpdateInterval = &vttimepb.Duration{} - } - - autoStart := true - if req.AutoStart != nil { - autoStart = req.GetAutoStart() - } - - options := &tabletmanagerdatapb.VDiffOptions{ - PickerOptions: &tabletmanagerdatapb.VDiffPickerOptions{ - TabletTypes: tabletTypesStr, - SourceCell: strings.Join(req.SourceCells, ","), - TargetCell: strings.Join(req.TargetCells, ","), - }, - CoreOptions: &tabletmanagerdatapb.VDiffCoreOptions{ - Tables: strings.Join(req.Tables, ","), - AutoRetry: req.AutoRetry, - MaxRows: req.Limit, - TimeoutSeconds: req.FilteredReplicationWaitTime.Seconds, - MaxExtraRowsToCompare: req.MaxExtraRowsToCompare, - UpdateTableStats: req.UpdateTableStats, - MaxDiffSeconds: req.MaxDiffDuration.Seconds, - AutoStart: &autoStart, - }, - ReportOptions: &tabletmanagerdatapb.VDiffReportOptions{ - OnlyPks: req.OnlyPKs, - DebugQuery: req.DebugQuery, - MaxSampleRows: req.MaxReportSampleRows, - RowDiffColumnTruncateAt: req.RowDiffColumnTruncateAt, - }, - } - - tabletreq := &tabletmanagerdatapb.VDiffRequest{ - Keyspace: req.TargetKeyspace, - Workflow: req.Workflow, - Action: string(vdiff.CreateAction), - Options: options, - VdiffUuid: req.Uuid, - } - - ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) - if err != nil { - return nil, err - } - if ts.frozen { - return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "invalid VDiff run: writes have been already been switched for workflow %s.%s", - req.TargetKeyspace, req.Workflow) - } - - workflowStatus, err := s.getWorkflowStatus(ctx, req.TargetKeyspace, req.Workflow) - if err != nil { - return nil, err - } - if workflowStatus != binlogdatapb.VReplicationWorkflowState_Running { - s.Logger().Infof("Workflow %s.%s is not running, cannot start VDiff in state %s", req.TargetKeyspace, req.Workflow, workflowStatus) - return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, - "not all streams are running in workflow %s.%s", req.TargetKeyspace, req.Workflow) - } - - err = ts.ForAllTargets(func(target *MigrationTarget) error { - _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) - return err - }) - if err != nil { - s.Logger().Errorf("Error executing vdiff create action: %v", err) - return nil, err - } - - return &vtctldatapb.VDiffCreateResponse{ - UUID: req.Uuid, - }, nil -} - -// VDiffDelete is part of the vtctlservicepb.VtctldServer interface. -func (s *Server) VDiffDelete(ctx context.Context, req *vtctldatapb.VDiffDeleteRequest) (*vtctldatapb.VDiffDeleteResponse, error) { - span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffDelete") - defer span.Finish() - - span.Annotate("keyspace", req.TargetKeyspace) - span.Annotate("workflow", req.Workflow) - span.Annotate("argument", req.Arg) - - tabletreq := &tabletmanagerdatapb.VDiffRequest{ - Keyspace: req.TargetKeyspace, - Workflow: req.Workflow, - Action: string(vdiff.DeleteAction), - ActionArg: req.Arg, - } - - ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) - if err != nil { - return nil, err - } - - err = ts.ForAllTargets(func(target *MigrationTarget) error { - _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) - return err - }) - if err != nil { - s.Logger().Errorf("Error executing vdiff delete action: %v", err) - return nil, err - } - - return &vtctldatapb.VDiffDeleteResponse{}, nil -} - -// VDiffResume is part of the vtctlservicepb.VtctldServer interface. -func (s *Server) VDiffResume(ctx context.Context, req *vtctldatapb.VDiffResumeRequest) (*vtctldatapb.VDiffResumeResponse, error) { - span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffResume") - defer span.Finish() - - targetShards := req.GetTargetShards() - - span.Annotate("keyspace", req.TargetKeyspace) - span.Annotate("workflow", req.Workflow) - span.Annotate("uuid", req.Uuid) - span.Annotate("target_shards", targetShards) - - tabletreq := &tabletmanagerdatapb.VDiffRequest{ - Keyspace: req.TargetKeyspace, - Workflow: req.Workflow, - Action: string(vdiff.ResumeAction), - VdiffUuid: req.Uuid, - } - - ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) - if err != nil { - return nil, err - } - - if len(targetShards) > 0 { - if err := applyTargetShards(ts, targetShards); err != nil { - return nil, err - } - } - - err = ts.ForAllTargets(func(target *MigrationTarget) error { - _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) - return err - }) - if err != nil { - s.Logger().Errorf("Error executing vdiff resume action: %v", err) - return nil, err - } - - return &vtctldatapb.VDiffResumeResponse{}, nil -} - -// VDiffShow is part of the vtctlservicepb.VtctldServer interface. -func (s *Server) VDiffShow(ctx context.Context, req *vtctldatapb.VDiffShowRequest) (*vtctldatapb.VDiffShowResponse, error) { - span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffShow") - defer span.Finish() - - span.Annotate("keyspace", req.TargetKeyspace) - span.Annotate("workflow", req.Workflow) - span.Annotate("argument", req.Arg) - - tabletreq := &tabletmanagerdatapb.VDiffRequest{ - Keyspace: req.TargetKeyspace, - Workflow: req.Workflow, - Action: string(vdiff.ShowAction), - ActionArg: req.Arg, - } - - ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) - if err != nil { - return nil, err - } - - output := &vdiffOutput{ - responses: make(map[string]*tabletmanagerdatapb.VDiffResponse, len(ts.targets)), - err: nil, - } - output.err = ts.ForAllTargets(func(target *MigrationTarget) error { - resp, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) - output.mu.Lock() - defer output.mu.Unlock() - output.responses[target.GetShard().ShardName()] = resp - return err - }) - if output.err != nil { - s.Logger().Errorf("Error executing vdiff show action: %v", output.err) - return nil, output.err - } - return &vtctldatapb.VDiffShowResponse{ - TabletResponses: output.responses, - }, nil -} - -// VDiffStop is part of the vtctlservicepb.VtctldServer interface. -func (s *Server) VDiffStop(ctx context.Context, req *vtctldatapb.VDiffStopRequest) (*vtctldatapb.VDiffStopResponse, error) { - span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffStop") - defer span.Finish() - - targetShards := req.GetTargetShards() - - span.Annotate("keyspace", req.TargetKeyspace) - span.Annotate("workflow", req.Workflow) - span.Annotate("uuid", req.Uuid) - span.Annotate("target_shards", targetShards) - - tabletreq := &tabletmanagerdatapb.VDiffRequest{ - Keyspace: req.TargetKeyspace, - Workflow: req.Workflow, - Action: string(vdiff.StopAction), - VdiffUuid: req.Uuid, - } - - ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) - if err != nil { - return nil, err - } - - if len(targetShards) > 0 { - if err := applyTargetShards(ts, targetShards); err != nil { - return nil, err - } - } - - err = ts.ForAllTargets(func(target *MigrationTarget) error { - _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) - return err - }) - if err != nil { - s.Logger().Errorf("Error executing vdiff stop action: %v", err) - return nil, err - } - - return &vtctldatapb.VDiffStopResponse{}, nil -} +// // VDiffCreate is part of the vtctlservicepb.VtctldServer interface. +// // It passes on the request to the target primary tablets that are +// // participating in the given workflow and VDiff. +// func (s *Server) VDiffCreate(ctx context.Context, req *vtctldatapb.VDiffCreateRequest) (*vtctldatapb.VDiffCreateResponse, error) { +// span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffCreate") +// defer span.Finish() + +// span.Annotate("keyspace", req.TargetKeyspace) +// span.Annotate("workflow", req.Workflow) +// span.Annotate("uuid", req.Uuid) +// span.Annotate("source_cells", req.SourceCells) +// span.Annotate("target_cells", req.TargetCells) +// span.Annotate("tablet_types", req.TabletTypes) +// span.Annotate("tables", req.Tables) +// span.Annotate("auto_retry", req.AutoRetry) +// span.Annotate("max_diff_duration", req.MaxDiffDuration) +// if req.AutoStart != nil { +// span.Annotate("auto_start", req.GetAutoStart()) +// } + +// var err error +// req.Uuid = strings.TrimSpace(req.Uuid) +// if req.Uuid == "" { // Generate a UUID +// req.Uuid = uuid.New().String() +// } else { // Validate UUID if provided +// if err = uuid.Validate(req.Uuid); err != nil { +// return nil, vterrors.Wrapf(err, "invalid UUID provided: %s", req.Uuid) +// } +// } + +// tabletTypesStr := discovery.BuildTabletTypesString(req.TabletTypes, req.TabletSelectionPreference) + +// if req.Limit == 0 { // This would produce no useful results +// req.Limit = math.MaxInt64 +// } +// // This is a pointer so there's no ZeroValue in the message +// // and an older v18 client will not provide it. +// if req.MaxDiffDuration == nil { +// req.MaxDiffDuration = &vttimepb.Duration{} +// } +// // The other vttime.Duration vars should not be nil as the +// // client should always provide them, but we check anyway to +// // be safe. +// if req.FilteredReplicationWaitTime == nil { +// // A value of 0 is not valid as the vdiff will never succeed. +// req.FilteredReplicationWaitTime = &vttimepb.Duration{ +// Seconds: int64(DefaultTimeout.Seconds()), +// } +// } +// if req.WaitUpdateInterval == nil { +// req.WaitUpdateInterval = &vttimepb.Duration{} +// } + +// autoStart := true +// if req.AutoStart != nil { +// autoStart = req.GetAutoStart() +// } + +// options := &tabletmanagerdatapb.VDiffOptions{ +// PickerOptions: &tabletmanagerdatapb.VDiffPickerOptions{ +// TabletTypes: tabletTypesStr, +// SourceCell: strings.Join(req.SourceCells, ","), +// TargetCell: strings.Join(req.TargetCells, ","), +// }, +// CoreOptions: &tabletmanagerdatapb.VDiffCoreOptions{ +// Tables: strings.Join(req.Tables, ","), +// AutoRetry: req.AutoRetry, +// MaxRows: req.Limit, +// TimeoutSeconds: req.FilteredReplicationWaitTime.Seconds, +// MaxExtraRowsToCompare: req.MaxExtraRowsToCompare, +// UpdateTableStats: req.UpdateTableStats, +// MaxDiffSeconds: req.MaxDiffDuration.Seconds, +// AutoStart: &autoStart, +// }, +// ReportOptions: &tabletmanagerdatapb.VDiffReportOptions{ +// OnlyPks: req.OnlyPKs, +// DebugQuery: req.DebugQuery, +// MaxSampleRows: req.MaxReportSampleRows, +// RowDiffColumnTruncateAt: req.RowDiffColumnTruncateAt, +// }, +// } + +// tabletreq := &tabletmanagerdatapb.VDiffRequest{ +// Keyspace: req.TargetKeyspace, +// Workflow: req.Workflow, +// Action: string(vdiff.CreateAction), +// Options: options, +// VdiffUuid: req.Uuid, +// } + +// ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) +// if err != nil { +// return nil, err +// } +// if ts.frozen { +// return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "invalid VDiff run: writes have been already been switched for workflow %s.%s", +// req.TargetKeyspace, req.Workflow) +// } + +// workflowStatus, err := s.getWorkflowStatus(ctx, req.TargetKeyspace, req.Workflow) +// if err != nil { +// return nil, err +// } +// if workflowStatus != binlogdatapb.VReplicationWorkflowState_Running { +// s.Logger().Infof("Workflow %s.%s is not running, cannot start VDiff in state %s", req.TargetKeyspace, req.Workflow, workflowStatus) +// return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, +// "not all streams are running in workflow %s.%s", req.TargetKeyspace, req.Workflow) +// } + +// err = ts.ForAllTargets(func(target *MigrationTarget) error { +// _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) +// return err +// }) +// if err != nil { +// s.Logger().Errorf("Error executing vdiff create action: %v", err) +// return nil, err +// } + +// return &vtctldatapb.VDiffCreateResponse{ +// UUID: req.Uuid, +// }, nil +// } + +// // VDiffDelete is part of the vtctlservicepb.VtctldServer interface. +// func (s *Server) VDiffDelete(ctx context.Context, req *vtctldatapb.VDiffDeleteRequest) (*vtctldatapb.VDiffDeleteResponse, error) { +// span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffDelete") +// defer span.Finish() + +// span.Annotate("keyspace", req.TargetKeyspace) +// span.Annotate("workflow", req.Workflow) +// span.Annotate("argument", req.Arg) + +// tabletreq := &tabletmanagerdatapb.VDiffRequest{ +// Keyspace: req.TargetKeyspace, +// Workflow: req.Workflow, +// Action: string(vdiff.DeleteAction), +// ActionArg: req.Arg, +// } + +// ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) +// if err != nil { +// return nil, err +// } + +// err = ts.ForAllTargets(func(target *MigrationTarget) error { +// _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) +// return err +// }) +// if err != nil { +// s.Logger().Errorf("Error executing vdiff delete action: %v", err) +// return nil, err +// } + +// return &vtctldatapb.VDiffDeleteResponse{}, nil +// } + +// // VDiffResume is part of the vtctlservicepb.VtctldServer interface. +// func (s *Server) VDiffResume(ctx context.Context, req *vtctldatapb.VDiffResumeRequest) (*vtctldatapb.VDiffResumeResponse, error) { +// span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffResume") +// defer span.Finish() + +// targetShards := req.GetTargetShards() + +// span.Annotate("keyspace", req.TargetKeyspace) +// span.Annotate("workflow", req.Workflow) +// span.Annotate("uuid", req.Uuid) +// span.Annotate("target_shards", targetShards) + +// tabletreq := &tabletmanagerdatapb.VDiffRequest{ +// Keyspace: req.TargetKeyspace, +// Workflow: req.Workflow, +// Action: string(vdiff.ResumeAction), +// VdiffUuid: req.Uuid, +// } + +// ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) +// if err != nil { +// return nil, err +// } + +// if len(targetShards) > 0 { +// if err := applyTargetShards(ts, targetShards); err != nil { +// return nil, err +// } +// } + +// err = ts.ForAllTargets(func(target *MigrationTarget) error { +// _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) +// return err +// }) +// if err != nil { +// s.Logger().Errorf("Error executing vdiff resume action: %v", err) +// return nil, err +// } + +// return &vtctldatapb.VDiffResumeResponse{}, nil +// } + +// // VDiffShow is part of the vtctlservicepb.VtctldServer interface. +// func (s *Server) VDiffShow(ctx context.Context, req *vtctldatapb.VDiffShowRequest) (*vtctldatapb.VDiffShowResponse, error) { +// span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffShow") +// defer span.Finish() + +// span.Annotate("keyspace", req.TargetKeyspace) +// span.Annotate("workflow", req.Workflow) +// span.Annotate("argument", req.Arg) + +// tabletreq := &tabletmanagerdatapb.VDiffRequest{ +// Keyspace: req.TargetKeyspace, +// Workflow: req.Workflow, +// Action: string(vdiff.ShowAction), +// ActionArg: req.Arg, +// } + +// ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) +// if err != nil { +// return nil, err +// } + +// output := &vdiffOutput{ +// responses: make(map[string]*tabletmanagerdatapb.VDiffResponse, len(ts.targets)), +// err: nil, +// } +// output.err = ts.ForAllTargets(func(target *MigrationTarget) error { +// resp, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) +// output.mu.Lock() +// defer output.mu.Unlock() +// output.responses[target.GetShard().ShardName()] = resp +// return err +// }) +// if output.err != nil { +// s.Logger().Errorf("Error executing vdiff show action: %v", output.err) +// return nil, output.err +// } +// return &vtctldatapb.VDiffShowResponse{ +// TabletResponses: output.responses, +// }, nil +// } + +// // VDiffStop is part of the vtctlservicepb.VtctldServer interface. +// func (s *Server) VDiffStop(ctx context.Context, req *vtctldatapb.VDiffStopRequest) (*vtctldatapb.VDiffStopResponse, error) { +// span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffStop") +// defer span.Finish() + +// targetShards := req.GetTargetShards() + +// span.Annotate("keyspace", req.TargetKeyspace) +// span.Annotate("workflow", req.Workflow) +// span.Annotate("uuid", req.Uuid) +// span.Annotate("target_shards", targetShards) + +// tabletreq := &tabletmanagerdatapb.VDiffRequest{ +// Keyspace: req.TargetKeyspace, +// Workflow: req.Workflow, +// Action: string(vdiff.StopAction), +// VdiffUuid: req.Uuid, +// } + +// ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) +// if err != nil { +// return nil, err +// } + +// if len(targetShards) > 0 { +// if err := applyTargetShards(ts, targetShards); err != nil { +// return nil, err +// } +// } + +// err = ts.ForAllTargets(func(target *MigrationTarget) error { +// _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) +// return err +// }) +// if err != nil { +// s.Logger().Errorf("Error executing vdiff stop action: %v", err) +// return nil, err +// } + +// return &vtctldatapb.VDiffStopResponse{}, nil +// } // WorkflowDelete is part of the vtctlservicepb.VtctldServer interface. // It passes on the request to the target primary tablets that are diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index 26d722f1de0..aaee75697a8 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -27,7 +27,6 @@ import ( "testing" "time" - "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/encoding/prototext" @@ -39,7 +38,6 @@ import ( "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vtenv" - "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" "vitess.io/vitess/go/vt/vttablet/tmclient" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -191,256 +189,256 @@ func TestCheckReshardingJournalExistsOnTablet(t *testing.T) { } } -// TestVDiffCreate performs some basic tests of the VDiffCreate function -// to ensure that it behaves as expected given a specific request. -func TestVDiffCreate(t *testing.T) { - ctx := context.Background() - workflowName := "wf1" - sourceKeyspace := &testKeyspace{ - KeyspaceName: "source", - ShardNames: []string{"0"}, - } - targetKeyspace := &testKeyspace{ - KeyspaceName: "target", - ShardNames: []string{"-80", "80-"}, - } - env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) - defer env.close() - - tests := []struct { - name string - req *vtctldatapb.VDiffCreateRequest - wantErr string - }{ - { - name: "no values", - req: &vtctldatapb.VDiffCreateRequest{}, - // We did not provide any keyspace or shard. - wantErr: "FindAllShardsInKeyspace() invalid keyspace name: UnescapeID err: invalid input identifier ''", - }, - { - name: "generated UUID", - req: &vtctldatapb.VDiffCreateRequest{ - TargetKeyspace: targetKeyspace.KeyspaceName, - Workflow: workflowName, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if tt.wantErr == "" { - env.tmc.expectVRQueryResultOnKeyspaceTablets(targetKeyspace.KeyspaceName, &queryResult{ - query: "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)", - result: &querypb.QueryResult{}, - }) - } - got, err := env.ws.VDiffCreate(ctx, tt.req) - if tt.wantErr != "" { - require.EqualError(t, err, tt.wantErr) - return - } - require.NoError(t, err) - require.NotNil(t, got) - // Ensure that we always use a valid UUID. - err = uuid.Validate(got.UUID) - require.NoError(t, err) - }) - } -} - -func TestVDiffResume(t *testing.T) { - ctx := context.Background() - sourceKeyspace := &testKeyspace{ - KeyspaceName: "sourceks", - ShardNames: []string{"0"}, - } - targetKeyspace := &testKeyspace{ - KeyspaceName: "targetks", - ShardNames: []string{"-80", "80-"}, - } - workflow := "testwf" - uuid := uuid.New().String() - env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) - defer env.close() - - env.tmc.strict = true - action := string(vdiff.ResumeAction) - - tests := []struct { - name string - req *vtctldatapb.VDiffResumeRequest // vtctld requests - expectedVDiffRequests map[*topodatapb.Tablet]*vdiffRequestResponse // tablet requests - wantErr string - }{ - { - name: "basic resume", // Both target shards - req: &vtctldatapb.VDiffResumeRequest{ - TargetKeyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Uuid: uuid, - }, - expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ - env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { - req: &tabletmanagerdatapb.VDiffRequest{ - Keyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Action: action, - VdiffUuid: uuid, - }, - }, - env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+tabletUIDStep]: { - req: &tabletmanagerdatapb.VDiffRequest{ - Keyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Action: action, - VdiffUuid: uuid, - }, - }, - }, - }, - { - name: "resume on first shard", - req: &vtctldatapb.VDiffResumeRequest{ - TargetKeyspace: targetKeyspace.KeyspaceName, - TargetShards: targetKeyspace.ShardNames[:1], - Workflow: workflow, - Uuid: uuid, - }, - expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ - env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { - req: &tabletmanagerdatapb.VDiffRequest{ - Keyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Action: action, - VdiffUuid: uuid, - }, - }, - }, - }, - { - name: "resume on invalid shard", - req: &vtctldatapb.VDiffResumeRequest{ - TargetKeyspace: targetKeyspace.KeyspaceName, - TargetShards: []string{"0"}, - Workflow: workflow, - Uuid: uuid, - }, - wantErr: fmt.Sprintf("specified target shard 0 not a valid target for workflow %s", workflow), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - for tab, vdr := range tt.expectedVDiffRequests { - env.tmc.expectVDiffRequest(tab, vdr) - } - got, err := env.ws.VDiffResume(ctx, tt.req) - if tt.wantErr != "" { - require.EqualError(t, err, tt.wantErr) - } else { - require.NoError(t, err) - require.NotNil(t, got) - } - env.tmc.confirmVDiffRequests(t) - }) - } -} - -func TestVDiffStop(t *testing.T) { - ctx := context.Background() - sourceKeyspace := &testKeyspace{ - KeyspaceName: "sourceks", - ShardNames: []string{"0"}, - } - targetKeyspace := &testKeyspace{ - KeyspaceName: "targetks", - ShardNames: []string{"-80", "80-"}, - } - workflow := "testwf" - uuid := uuid.New().String() - env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) - defer env.close() - - env.tmc.strict = true - action := string(vdiff.StopAction) - - tests := []struct { - name string - req *vtctldatapb.VDiffStopRequest // vtctld requests - expectedVDiffRequests map[*topodatapb.Tablet]*vdiffRequestResponse // tablet requests - wantErr string - }{ - { - name: "basic stop", // Both target shards - req: &vtctldatapb.VDiffStopRequest{ - TargetKeyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Uuid: uuid, - }, - expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ - env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { - req: &tabletmanagerdatapb.VDiffRequest{ - Keyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Action: action, - VdiffUuid: uuid, - }, - }, - env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+tabletUIDStep]: { - req: &tabletmanagerdatapb.VDiffRequest{ - Keyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Action: action, - VdiffUuid: uuid, - }, - }, - }, - }, - { - name: "stop on first shard", - req: &vtctldatapb.VDiffStopRequest{ - TargetKeyspace: targetKeyspace.KeyspaceName, - TargetShards: targetKeyspace.ShardNames[:1], - Workflow: workflow, - Uuid: uuid, - }, - expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ - env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { - req: &tabletmanagerdatapb.VDiffRequest{ - Keyspace: targetKeyspace.KeyspaceName, - Workflow: workflow, - Action: action, - VdiffUuid: uuid, - }, - }, - }, - }, - { - name: "stop on invalid shard", - req: &vtctldatapb.VDiffStopRequest{ - TargetKeyspace: targetKeyspace.KeyspaceName, - TargetShards: []string{"0"}, - Workflow: workflow, - Uuid: uuid, - }, - wantErr: fmt.Sprintf("specified target shard 0 not a valid target for workflow %s", workflow), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - for tab, vdr := range tt.expectedVDiffRequests { - env.tmc.expectVDiffRequest(tab, vdr) - } - got, err := env.ws.VDiffStop(ctx, tt.req) - if tt.wantErr != "" { - require.EqualError(t, err, tt.wantErr) - } else { - require.NoError(t, err) - require.NotNil(t, got) - } - env.tmc.confirmVDiffRequests(t) - }) - } -} +// // TestVDiffCreate performs some basic tests of the VDiffCreate function +// // to ensure that it behaves as expected given a specific request. +// func TestVDiffCreate(t *testing.T) { +// ctx := context.Background() +// workflowName := "wf1" +// sourceKeyspace := &testKeyspace{ +// KeyspaceName: "source", +// ShardNames: []string{"0"}, +// } +// targetKeyspace := &testKeyspace{ +// KeyspaceName: "target", +// ShardNames: []string{"-80", "80-"}, +// } +// env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) +// defer env.close() + +// tests := []struct { +// name string +// req *vtctldatapb.VDiffCreateRequest +// wantErr string +// }{ +// { +// name: "no values", +// req: &vtctldatapb.VDiffCreateRequest{}, +// // We did not provide any keyspace or shard. +// wantErr: "FindAllShardsInKeyspace() invalid keyspace name: UnescapeID err: invalid input identifier ''", +// }, +// { +// name: "generated UUID", +// req: &vtctldatapb.VDiffCreateRequest{ +// TargetKeyspace: targetKeyspace.KeyspaceName, +// Workflow: workflowName, +// }, +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// if tt.wantErr == "" { +// env.tmc.expectVRQueryResultOnKeyspaceTablets(targetKeyspace.KeyspaceName, &queryResult{ +// query: "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)", +// result: &querypb.QueryResult{}, +// }) +// } +// got, err := env.ws.VDiffCreate(ctx, tt.req) +// if tt.wantErr != "" { +// require.EqualError(t, err, tt.wantErr) +// return +// } +// require.NoError(t, err) +// require.NotNil(t, got) +// // Ensure that we always use a valid UUID. +// err = uuid.Validate(got.UUID) +// require.NoError(t, err) +// }) +// } +// } + +// func TestVDiffResume(t *testing.T) { +// ctx := context.Background() +// sourceKeyspace := &testKeyspace{ +// KeyspaceName: "sourceks", +// ShardNames: []string{"0"}, +// } +// targetKeyspace := &testKeyspace{ +// KeyspaceName: "targetks", +// ShardNames: []string{"-80", "80-"}, +// } +// workflow := "testwf" +// uuid := uuid.New().String() +// env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) +// defer env.close() + +// env.tmc.strict = true +// action := string(vdiff.ResumeAction) + +// tests := []struct { +// name string +// req *vtctldatapb.VDiffResumeRequest // vtctld requests +// expectedVDiffRequests map[*topodatapb.Tablet]*vdiffRequestResponse // tablet requests +// wantErr string +// }{ +// { +// name: "basic resume", // Both target shards +// req: &vtctldatapb.VDiffResumeRequest{ +// TargetKeyspace: targetKeyspace.KeyspaceName, +// Workflow: workflow, +// Uuid: uuid, +// }, +// expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ +// env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { +// req: &tabletmanagerdatapb.VDiffRequest{ +// Keyspace: targetKeyspace.KeyspaceName, +// Workflow: workflow, +// Action: action, +// VdiffUuid: uuid, +// }, +// }, +// env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+tabletUIDStep]: { +// req: &tabletmanagerdatapb.VDiffRequest{ +// Keyspace: targetKeyspace.KeyspaceName, +// Workflow: workflow, +// Action: action, +// VdiffUuid: uuid, +// }, +// }, +// }, +// }, +// { +// name: "resume on first shard", +// req: &vtctldatapb.VDiffResumeRequest{ +// TargetKeyspace: targetKeyspace.KeyspaceName, +// TargetShards: targetKeyspace.ShardNames[:1], +// Workflow: workflow, +// Uuid: uuid, +// }, +// expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ +// env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { +// req: &tabletmanagerdatapb.VDiffRequest{ +// Keyspace: targetKeyspace.KeyspaceName, +// Workflow: workflow, +// Action: action, +// VdiffUuid: uuid, +// }, +// }, +// }, +// }, +// { +// name: "resume on invalid shard", +// req: &vtctldatapb.VDiffResumeRequest{ +// TargetKeyspace: targetKeyspace.KeyspaceName, +// TargetShards: []string{"0"}, +// Workflow: workflow, +// Uuid: uuid, +// }, +// wantErr: fmt.Sprintf("specified target shard 0 not a valid target for workflow %s", workflow), +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// for tab, vdr := range tt.expectedVDiffRequests { +// env.tmc.expectVDiffRequest(tab, vdr) +// } +// got, err := env.ws.VDiffResume(ctx, tt.req) +// if tt.wantErr != "" { +// require.EqualError(t, err, tt.wantErr) +// } else { +// require.NoError(t, err) +// require.NotNil(t, got) +// } +// env.tmc.confirmVDiffRequests(t) +// }) +// } +// } + +// func TestVDiffStop(t *testing.T) { +// ctx := context.Background() +// sourceKeyspace := &testKeyspace{ +// KeyspaceName: "sourceks", +// ShardNames: []string{"0"}, +// } +// targetKeyspace := &testKeyspace{ +// KeyspaceName: "targetks", +// ShardNames: []string{"-80", "80-"}, +// } +// workflow := "testwf" +// uuid := uuid.New().String() +// env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) +// defer env.close() + +// env.tmc.strict = true +// action := string(vdiff.StopAction) + +// tests := []struct { +// name string +// req *vtctldatapb.VDiffStopRequest // vtctld requests +// expectedVDiffRequests map[*topodatapb.Tablet]*vdiffRequestResponse // tablet requests +// wantErr string +// }{ +// { +// name: "basic stop", // Both target shards +// req: &vtctldatapb.VDiffStopRequest{ +// TargetKeyspace: targetKeyspace.KeyspaceName, +// Workflow: workflow, +// Uuid: uuid, +// }, +// expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ +// env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { +// req: &tabletmanagerdatapb.VDiffRequest{ +// Keyspace: targetKeyspace.KeyspaceName, +// Workflow: workflow, +// Action: action, +// VdiffUuid: uuid, +// }, +// }, +// env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+tabletUIDStep]: { +// req: &tabletmanagerdatapb.VDiffRequest{ +// Keyspace: targetKeyspace.KeyspaceName, +// Workflow: workflow, +// Action: action, +// VdiffUuid: uuid, +// }, +// }, +// }, +// }, +// { +// name: "stop on first shard", +// req: &vtctldatapb.VDiffStopRequest{ +// TargetKeyspace: targetKeyspace.KeyspaceName, +// TargetShards: targetKeyspace.ShardNames[:1], +// Workflow: workflow, +// Uuid: uuid, +// }, +// expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ +// env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { +// req: &tabletmanagerdatapb.VDiffRequest{ +// Keyspace: targetKeyspace.KeyspaceName, +// Workflow: workflow, +// Action: action, +// VdiffUuid: uuid, +// }, +// }, +// }, +// }, +// { +// name: "stop on invalid shard", +// req: &vtctldatapb.VDiffStopRequest{ +// TargetKeyspace: targetKeyspace.KeyspaceName, +// TargetShards: []string{"0"}, +// Workflow: workflow, +// Uuid: uuid, +// }, +// wantErr: fmt.Sprintf("specified target shard 0 not a valid target for workflow %s", workflow), +// }, +// } +// for _, tt := range tests { +// t.Run(tt.name, func(t *testing.T) { +// for tab, vdr := range tt.expectedVDiffRequests { +// env.tmc.expectVDiffRequest(tab, vdr) +// } +// got, err := env.ws.VDiffStop(ctx, tt.req) +// if tt.wantErr != "" { +// require.EqualError(t, err, tt.wantErr) +// } else { +// require.NoError(t, err) +// require.NotNil(t, got) +// } +// env.tmc.confirmVDiffRequests(t) +// }) +// } +// } func TestMoveTablesComplete(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) diff --git a/go/vt/vtctl/workflow/vdiff.go b/go/vt/vtctl/workflow/vdiff.go index 6be5fe3c3b5..30953868b0b 100644 --- a/go/vt/vtctl/workflow/vdiff.go +++ b/go/vt/vtctl/workflow/vdiff.go @@ -17,16 +17,26 @@ limitations under the License. package workflow import ( + "context" "encoding/json" "math" "sort" "strings" "time" + "github.com/google/uuid" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/trace" + "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + vttimepb "vitess.io/vitess/go/vt/proto/vttime" ) // TableSummary aggregates the current state of the table diff from all shards. @@ -278,3 +288,284 @@ func BuildProgressReport(rowsCompared int64, rowsToCompare int64, startedAt stri } return report } + +// VDiffCreate is part of the vtctlservicepb.VtctldServer interface. +// It passes on the request to the target primary tablets that are +// participating in the given workflow and VDiff. +func (s *Server) VDiffCreate(ctx context.Context, req *vtctldatapb.VDiffCreateRequest) (*vtctldatapb.VDiffCreateResponse, error) { + span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffCreate") + defer span.Finish() + + span.Annotate("keyspace", req.TargetKeyspace) + span.Annotate("workflow", req.Workflow) + span.Annotate("uuid", req.Uuid) + span.Annotate("source_cells", req.SourceCells) + span.Annotate("target_cells", req.TargetCells) + span.Annotate("tablet_types", req.TabletTypes) + span.Annotate("tables", req.Tables) + span.Annotate("auto_retry", req.AutoRetry) + span.Annotate("max_diff_duration", req.MaxDiffDuration) + if req.AutoStart != nil { + span.Annotate("auto_start", req.GetAutoStart()) + } + + var err error + req.Uuid = strings.TrimSpace(req.Uuid) + if req.Uuid == "" { // Generate a UUID + req.Uuid = uuid.New().String() + } else { // Validate UUID if provided + if err = uuid.Validate(req.Uuid); err != nil { + return nil, vterrors.Wrapf(err, "invalid UUID provided: %s", req.Uuid) + } + } + + tabletTypesStr := discovery.BuildTabletTypesString(req.TabletTypes, req.TabletSelectionPreference) + + if req.Limit == 0 { // This would produce no useful results + req.Limit = math.MaxInt64 + } + // This is a pointer so there's no ZeroValue in the message + // and an older v18 client will not provide it. + if req.MaxDiffDuration == nil { + req.MaxDiffDuration = &vttimepb.Duration{} + } + // The other vttime.Duration vars should not be nil as the + // client should always provide them, but we check anyway to + // be safe. + if req.FilteredReplicationWaitTime == nil { + // A value of 0 is not valid as the vdiff will never succeed. + req.FilteredReplicationWaitTime = &vttimepb.Duration{ + Seconds: int64(DefaultTimeout.Seconds()), + } + } + if req.WaitUpdateInterval == nil { + req.WaitUpdateInterval = &vttimepb.Duration{} + } + + autoStart := true + if req.AutoStart != nil { + autoStart = req.GetAutoStart() + } + + options := &tabletmanagerdatapb.VDiffOptions{ + PickerOptions: &tabletmanagerdatapb.VDiffPickerOptions{ + TabletTypes: tabletTypesStr, + SourceCell: strings.Join(req.SourceCells, ","), + TargetCell: strings.Join(req.TargetCells, ","), + }, + CoreOptions: &tabletmanagerdatapb.VDiffCoreOptions{ + Tables: strings.Join(req.Tables, ","), + AutoRetry: req.AutoRetry, + MaxRows: req.Limit, + TimeoutSeconds: req.FilteredReplicationWaitTime.Seconds, + MaxExtraRowsToCompare: req.MaxExtraRowsToCompare, + UpdateTableStats: req.UpdateTableStats, + MaxDiffSeconds: req.MaxDiffDuration.Seconds, + AutoStart: &autoStart, + }, + ReportOptions: &tabletmanagerdatapb.VDiffReportOptions{ + OnlyPks: req.OnlyPKs, + DebugQuery: req.DebugQuery, + MaxSampleRows: req.MaxReportSampleRows, + RowDiffColumnTruncateAt: req.RowDiffColumnTruncateAt, + }, + } + + tabletreq := &tabletmanagerdatapb.VDiffRequest{ + Keyspace: req.TargetKeyspace, + Workflow: req.Workflow, + Action: string(vdiff.CreateAction), + Options: options, + VdiffUuid: req.Uuid, + } + + ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) + if err != nil { + return nil, err + } + if ts.frozen { + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "invalid VDiff run: writes have been already been switched for workflow %s.%s", + req.TargetKeyspace, req.Workflow) + } + + workflowStatus, err := s.getWorkflowStatus(ctx, req.TargetKeyspace, req.Workflow) + if err != nil { + return nil, err + } + if workflowStatus != binlogdatapb.VReplicationWorkflowState_Running { + s.Logger().Infof("Workflow %s.%s is not running, cannot start VDiff in state %s", req.TargetKeyspace, req.Workflow, workflowStatus) + return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, + "not all streams are running in workflow %s.%s", req.TargetKeyspace, req.Workflow) + } + + err = ts.ForAllTargets(func(target *MigrationTarget) error { + _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) + return err + }) + if err != nil { + s.Logger().Errorf("Error executing vdiff create action: %v", err) + return nil, err + } + + return &vtctldatapb.VDiffCreateResponse{ + UUID: req.Uuid, + }, nil +} + +// VDiffDelete is part of the vtctlservicepb.VtctldServer interface. +func (s *Server) VDiffDelete(ctx context.Context, req *vtctldatapb.VDiffDeleteRequest) (*vtctldatapb.VDiffDeleteResponse, error) { + span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffDelete") + defer span.Finish() + + span.Annotate("keyspace", req.TargetKeyspace) + span.Annotate("workflow", req.Workflow) + span.Annotate("argument", req.Arg) + + tabletreq := &tabletmanagerdatapb.VDiffRequest{ + Keyspace: req.TargetKeyspace, + Workflow: req.Workflow, + Action: string(vdiff.DeleteAction), + ActionArg: req.Arg, + } + + ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) + if err != nil { + return nil, err + } + + err = ts.ForAllTargets(func(target *MigrationTarget) error { + _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) + return err + }) + if err != nil { + s.Logger().Errorf("Error executing vdiff delete action: %v", err) + return nil, err + } + + return &vtctldatapb.VDiffDeleteResponse{}, nil +} + +// VDiffResume is part of the vtctlservicepb.VtctldServer interface. +func (s *Server) VDiffResume(ctx context.Context, req *vtctldatapb.VDiffResumeRequest) (*vtctldatapb.VDiffResumeResponse, error) { + span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffResume") + defer span.Finish() + + targetShards := req.GetTargetShards() + + span.Annotate("keyspace", req.TargetKeyspace) + span.Annotate("workflow", req.Workflow) + span.Annotate("uuid", req.Uuid) + span.Annotate("target_shards", targetShards) + + tabletreq := &tabletmanagerdatapb.VDiffRequest{ + Keyspace: req.TargetKeyspace, + Workflow: req.Workflow, + Action: string(vdiff.ResumeAction), + VdiffUuid: req.Uuid, + } + + ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) + if err != nil { + return nil, err + } + + if len(targetShards) > 0 { + if err := applyTargetShards(ts, targetShards); err != nil { + return nil, err + } + } + + err = ts.ForAllTargets(func(target *MigrationTarget) error { + _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) + return err + }) + if err != nil { + s.Logger().Errorf("Error executing vdiff resume action: %v", err) + return nil, err + } + + return &vtctldatapb.VDiffResumeResponse{}, nil +} + +// VDiffShow is part of the vtctlservicepb.VtctldServer interface. +func (s *Server) VDiffShow(ctx context.Context, req *vtctldatapb.VDiffShowRequest) (*vtctldatapb.VDiffShowResponse, error) { + span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffShow") + defer span.Finish() + + span.Annotate("keyspace", req.TargetKeyspace) + span.Annotate("workflow", req.Workflow) + span.Annotate("argument", req.Arg) + + tabletreq := &tabletmanagerdatapb.VDiffRequest{ + Keyspace: req.TargetKeyspace, + Workflow: req.Workflow, + Action: string(vdiff.ShowAction), + ActionArg: req.Arg, + } + + ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) + if err != nil { + return nil, err + } + + output := &vdiffOutput{ + responses: make(map[string]*tabletmanagerdatapb.VDiffResponse, len(ts.targets)), + err: nil, + } + output.err = ts.ForAllTargets(func(target *MigrationTarget) error { + resp, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) + output.mu.Lock() + defer output.mu.Unlock() + output.responses[target.GetShard().ShardName()] = resp + return err + }) + if output.err != nil { + s.Logger().Errorf("Error executing vdiff show action: %v", output.err) + return nil, output.err + } + return &vtctldatapb.VDiffShowResponse{ + TabletResponses: output.responses, + }, nil +} + +// VDiffStop is part of the vtctlservicepb.VtctldServer interface. +func (s *Server) VDiffStop(ctx context.Context, req *vtctldatapb.VDiffStopRequest) (*vtctldatapb.VDiffStopResponse, error) { + span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffStop") + defer span.Finish() + + targetShards := req.GetTargetShards() + + span.Annotate("keyspace", req.TargetKeyspace) + span.Annotate("workflow", req.Workflow) + span.Annotate("uuid", req.Uuid) + span.Annotate("target_shards", targetShards) + + tabletreq := &tabletmanagerdatapb.VDiffRequest{ + Keyspace: req.TargetKeyspace, + Workflow: req.Workflow, + Action: string(vdiff.StopAction), + VdiffUuid: req.Uuid, + } + + ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow) + if err != nil { + return nil, err + } + + if len(targetShards) > 0 { + if err := applyTargetShards(ts, targetShards); err != nil { + return nil, err + } + } + + err = ts.ForAllTargets(func(target *MigrationTarget) error { + _, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq) + return err + }) + if err != nil { + s.Logger().Errorf("Error executing vdiff stop action: %v", err) + return nil, err + } + + return &vtctldatapb.VDiffStopResponse{}, nil +} diff --git a/go/vt/vtctl/workflow/vdiff_test.go b/go/vt/vtctl/workflow/vdiff_test.go index e5578afc170..0da4a3ef480 100644 --- a/go/vt/vtctl/workflow/vdiff_test.go +++ b/go/vt/vtctl/workflow/vdiff_test.go @@ -17,13 +17,21 @@ limitations under the License. package workflow import ( + "context" + "fmt" "math" "testing" "time" + "github.com/google/uuid" "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff" + + querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) func TestBuildProgressReport(t *testing.T) { @@ -134,3 +142,350 @@ func TestBuildProgressReport(t *testing.T) { }) } } + +// TestVDiffCreate performs some basic tests of the VDiffCreate function +// to ensure that it behaves as expected given a specific request. +func TestVDiffCreate(t *testing.T) { + ctx := context.Background() + workflowName := "wf1" + sourceKeyspace := &testKeyspace{ + KeyspaceName: "source", + ShardNames: []string{"0"}, + } + targetKeyspace := &testKeyspace{ + KeyspaceName: "target", + ShardNames: []string{"-80", "80-"}, + } + env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + defer env.close() + + tests := []struct { + name string + req *vtctldatapb.VDiffCreateRequest + wantErr string + }{ + { + name: "no values", + req: &vtctldatapb.VDiffCreateRequest{}, + // We did not provide any keyspace or shard. + wantErr: "FindAllShardsInKeyspace() invalid keyspace name: UnescapeID err: invalid input identifier ''", + }, + { + name: "generated UUID", + req: &vtctldatapb.VDiffCreateRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + Workflow: workflowName, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.wantErr == "" { + env.tmc.expectVRQueryResultOnKeyspaceTablets(targetKeyspace.KeyspaceName, &queryResult{ + query: "select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in (1) and id in (select max(id) from _vt.copy_state where vrepl_id in (1) group by vrepl_id, table_name)", + result: &querypb.QueryResult{}, + }) + } + got, err := env.ws.VDiffCreate(ctx, tt.req) + if tt.wantErr != "" { + require.EqualError(t, err, tt.wantErr) + return + } + require.NoError(t, err) + require.NotNil(t, got) + // Ensure that we always use a valid UUID. + err = uuid.Validate(got.UUID) + require.NoError(t, err) + }) + } +} + +func TestVDiffResume(t *testing.T) { + ctx := context.Background() + sourceKeyspace := &testKeyspace{ + KeyspaceName: "sourceks", + ShardNames: []string{"0"}, + } + targetKeyspace := &testKeyspace{ + KeyspaceName: "targetks", + ShardNames: []string{"-80", "80-"}, + } + workflow := "testwf" + uuid := uuid.New().String() + env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + defer env.close() + + env.tmc.strict = true + action := string(vdiff.ResumeAction) + + tests := []struct { + name string + req *vtctldatapb.VDiffResumeRequest // vtctld requests + expectedVDiffRequests map[*topodatapb.Tablet]*vdiffRequestResponse // tablet requests + wantErr string + }{ + { + name: "basic resume", // Both target shards + req: &vtctldatapb.VDiffResumeRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Uuid: uuid, + }, + expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + VdiffUuid: uuid, + }, + }, + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+tabletUIDStep]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + VdiffUuid: uuid, + }, + }, + }, + }, + { + name: "resume on first shard", + req: &vtctldatapb.VDiffResumeRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + TargetShards: targetKeyspace.ShardNames[:1], + Workflow: workflow, + Uuid: uuid, + }, + expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + VdiffUuid: uuid, + }, + }, + }, + }, + { + name: "resume on invalid shard", + req: &vtctldatapb.VDiffResumeRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + TargetShards: []string{"0"}, + Workflow: workflow, + Uuid: uuid, + }, + wantErr: fmt.Sprintf("specified target shard 0 not a valid target for workflow %s", workflow), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + for tab, vdr := range tt.expectedVDiffRequests { + env.tmc.expectVDiffRequest(tab, vdr) + } + got, err := env.ws.VDiffResume(ctx, tt.req) + if tt.wantErr != "" { + require.EqualError(t, err, tt.wantErr) + } else { + require.NoError(t, err) + require.NotNil(t, got) + } + env.tmc.confirmVDiffRequests(t) + }) + } +} + +func TestVDiffStop(t *testing.T) { + ctx := context.Background() + sourceKeyspace := &testKeyspace{ + KeyspaceName: "sourceks", + ShardNames: []string{"0"}, + } + targetKeyspace := &testKeyspace{ + KeyspaceName: "targetks", + ShardNames: []string{"-80", "80-"}, + } + workflow := "testwf" + uuid := uuid.New().String() + env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + defer env.close() + + env.tmc.strict = true + action := string(vdiff.StopAction) + + tests := []struct { + name string + req *vtctldatapb.VDiffStopRequest // vtctld requests + expectedVDiffRequests map[*topodatapb.Tablet]*vdiffRequestResponse // tablet requests + wantErr string + }{ + { + name: "basic stop", // Both target shards + req: &vtctldatapb.VDiffStopRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Uuid: uuid, + }, + expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + VdiffUuid: uuid, + }, + }, + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+tabletUIDStep]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + VdiffUuid: uuid, + }, + }, + }, + }, + { + name: "stop on first shard", + req: &vtctldatapb.VDiffStopRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + TargetShards: targetKeyspace.ShardNames[:1], + Workflow: workflow, + Uuid: uuid, + }, + expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + VdiffUuid: uuid, + }, + }, + }, + }, + { + name: "stop on invalid shard", + req: &vtctldatapb.VDiffStopRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + TargetShards: []string{"0"}, + Workflow: workflow, + Uuid: uuid, + }, + wantErr: fmt.Sprintf("specified target shard 0 not a valid target for workflow %s", workflow), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + for tab, vdr := range tt.expectedVDiffRequests { + env.tmc.expectVDiffRequest(tab, vdr) + } + got, err := env.ws.VDiffStop(ctx, tt.req) + if tt.wantErr != "" { + require.EqualError(t, err, tt.wantErr) + } else { + require.NoError(t, err) + require.NotNil(t, got) + } + env.tmc.confirmVDiffRequests(t) + }) + } +} + +func TestVDiffDelete(t *testing.T) { + ctx := context.Background() + sourceKeyspace := &testKeyspace{ + KeyspaceName: "sourceks", + ShardNames: []string{"0"}, + } + targetKeyspace := &testKeyspace{ + KeyspaceName: "targetks", + ShardNames: []string{"-80", "80-"}, + } + workflow := "testwf" + uuid := uuid.New().String() + env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + defer env.close() + + env.tmc.strict = true + action := string(vdiff.DeleteAction) + + tests := []struct { + name string + req *vtctldatapb.VDiffDeleteRequest + expectedVDiffRequests map[*topodatapb.Tablet]*vdiffRequestResponse + wantErr string + }{ + { + name: "basic delete", + req: &vtctldatapb.VDiffDeleteRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Arg: uuid, + }, + expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + ActionArg: uuid, + }, + }, + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+tabletUIDStep]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + ActionArg: uuid, + }, + }, + }, + }, + { + name: "invalid delete", + req: &vtctldatapb.VDiffDeleteRequest{ + TargetKeyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Arg: uuid, + }, + expectedVDiffRequests: map[*topodatapb.Tablet]*vdiffRequestResponse{ + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + ActionArg: uuid, + }, + err: fmt.Errorf("error on invalid delete"), + }, + env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+tabletUIDStep]: { + req: &tabletmanagerdatapb.VDiffRequest{ + Keyspace: targetKeyspace.KeyspaceName, + Workflow: workflow, + Action: action, + ActionArg: uuid, + }, + }, + }, + wantErr: "error on invalid delete", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + for tab, vdr := range tt.expectedVDiffRequests { + env.tmc.expectVDiffRequest(tab, vdr) + } + got, err := env.ws.VDiffDelete(ctx, tt.req) + if tt.wantErr != "" { + require.EqualError(t, err, tt.wantErr) + } else { + require.NoError(t, err) + require.NotNil(t, got) + } + env.tmc.confirmVDiffRequests(t) + }) + } +}