Skip to content

Commit

Permalink
Add delete flag for externalize command to continue old behaviour
Browse files Browse the repository at this point in the history
Signed-off-by: Noble Mittal <noblemittal@outlook.com>
  • Loading branch information
beingnoble03 committed Jan 8, 2025
1 parent 983a415 commit 54753a4
Show file tree
Hide file tree
Showing 9 changed files with 1,360 additions and 1,202 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ var (

externalizeOptions = struct {
Keyspace string
Delete bool
}{}

internalizeOptions = struct {
Expand Down Expand Up @@ -178,7 +179,7 @@ var (
// externalize makes a LookupVindexExternalize call to a vtctld.
externalize = &cobra.Command{
Use: "externalize",
Short: "Externalize the Lookup Vindex. If the Vindex has an owner the VReplication workflow will also be stopped.",
Short: "Externalize the Lookup Vindex. If the Vindex has an owner the VReplication workflow will also be stopped/deleted.",
Example: `vtctldclient --server localhost:15999 LookupVindex --name corder_lookup_vdx --table-keyspace customer externalize`,
SilenceUsage: true,
DisableFlagsInUseLine: true,
Expand Down Expand Up @@ -252,6 +253,8 @@ func commandComplete(cmd *cobra.Command, args []string) error {
output := fmt.Sprintf("LookupVindex %s has been completed", baseOptions.Name)
if resp.WorkflowDeleted {
output = output + fmt.Sprintf(" and the %s VReplication workflow has been deleted", baseOptions.Name)
} else {
output = output + ". The VReplication workflow hasn't been stopped as the vindex is unowned."
}
fmt.Println(output)

Expand Down Expand Up @@ -295,6 +298,8 @@ func commandExternalize(cmd *cobra.Command, args []string) error {
Name: baseOptions.Name,
// Where the lookup table and VReplication workflow were created.
TableKeyspace: baseOptions.TableKeyspace,
// Delete the workflow after externalizing, instead of stopping.
DeleteWorkflow: externalizeOptions.Delete,
})

if err != nil {
Expand All @@ -303,7 +308,9 @@ func commandExternalize(cmd *cobra.Command, args []string) error {

output := fmt.Sprintf("LookupVindex %s has been externalized", baseOptions.Name)
if resp.WorkflowStopped {
output = output + fmt.Sprintf(" and the %s VReplication workflow has been stopped", baseOptions.Name)
output = output + " and the VReplication workflow has been stopped."
} else if resp.WorkflowDeleted {
output = output + " and the VReplication workflow has been deleted."
}
fmt.Println(output)

Expand All @@ -330,7 +337,7 @@ func commandInternalize(cmd *cobra.Command, args []string) error {

output := fmt.Sprintf("LookupVindex %s has been internalized", baseOptions.Name)
if resp.WorkflowStarted {
output = output + fmt.Sprintf(" and the %s VReplication workflow has been started", baseOptions.Name)
output = output + " and the VReplication workflow has been started."
}
fmt.Println(output)

Expand Down Expand Up @@ -394,6 +401,7 @@ func registerCommands(root *cobra.Command) {
// vindex has an owner as the lookup vindex will then be
// managed by VTGate.
externalize.Flags().StringVar(&externalizeOptions.Keyspace, "keyspace", "", "The keyspace containing the Lookup Vindex. If no value is specified then the table-keyspace will be used.")
externalize.Flags().BoolVar(&externalizeOptions.Delete, "delete", false, "Delete the VReplication workflow after externalizing LookupVindex, instead of stopping (default false).")
base.AddCommand(externalize)

internalize.Flags().StringVar(&internalizeOptions.Keyspace, "keyspace", "", "The keyspace containing the Lookup Vindex. If no value is specified then the table-keyspace will be used.")
Expand Down
2,244 changes: 1,134 additions & 1,110 deletions go/vt/proto/vtctldata/vtctldata.pb.go

Large diffs are not rendered by default.

68 changes: 68 additions & 0 deletions go/vt/proto/vtctldata/vtctldata_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 41 additions & 0 deletions go/vt/vtctl/workflow/lookup_vindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vttablet/tmclient"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
Expand Down Expand Up @@ -540,3 +541,43 @@ func generateColDef(lines []string, sourceVindexCol, vindexFromCol string) (stri
}
return "", fmt.Errorf("column %s not found in schema %v", sourceVindexCol, lines)
}

func (lv *lookupVindex) validateExternalized(ctx context.Context, vindex *vschemapb.Vindex, name string, targetShards []*topo.ShardInfo) error {
if _, ok := vindex.Params["write_only"]; ok {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "write_only param found in vindex %s", name)
}

err := forAllShards(targetShards, func(targetShard *topo.ShardInfo) error {
targetPrimary, err := lv.ts.GetTablet(ctx, targetShard.PrimaryAlias)
if err != nil {
return err
}
res, err := lv.tmc.ReadVReplicationWorkflow(ctx, targetPrimary.Tablet, &tabletmanagerdatapb.ReadVReplicationWorkflowRequest{
Workflow: name,
})
if err != nil {
return err
}
if res == nil || res.Workflow == "" {
return vterrors.Errorf(vtrpcpb.Code_NOT_FOUND, "workflow %s not found on %v", name, targetPrimary.Alias)
}
for _, stream := range res.Streams {
if vindex.Owner == "" {
// If there's no owner, all streams need to be running.
if stream.State != binlogdatapb.VReplicationWorkflowState_Running {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "stream %d for %v.%v is not in Running state: %v", stream.Id, targetShard.Keyspace(), targetShard.ShardName(), stream.State)
}
} else {
// If there's an owner, all streams need to be frozen.
if stream.State != binlogdatapb.VReplicationWorkflowState_Stopped || stream.Message != Frozen {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "stream %d for %v.%v is not frozen: %v, %v", stream.Id, targetShard.Keyspace(), targetShard.ShardName(), stream.State, stream.Message)
}
}
}
return nil
})
if err != nil {
return err
}
return nil
}
Loading

0 comments on commit 54753a4

Please sign in to comment.