Skip to content

Commit

Permalink
Changes from self review
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Jan 3, 2025
1 parent e1832b2 commit c158a65
Show file tree
Hide file tree
Showing 16 changed files with 62 additions and 45 deletions.
8 changes: 6 additions & 2 deletions go/cmd/vtctldclient/command/permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,20 @@ var (
Args: cobra.ExactArgs(1),
RunE: commandGetPermissions,
}
// ValidatePermissionsShard makes a ValidatePermissionsKeyspace gRPC call to a
// vtctld with the specified shard to examine in the keyspace.
ValidatePermissionsShard = &cobra.Command{
Use: "ValidatePermissionsShard <keyspace/shard>",
Short: "Validates that the permissions on primary match all the replicas.",
Short: "Validates that the permissions on the primary match all of the replicas.",
DisableFlagsInUseLine: true,
Args: cobra.ExactArgs(1),
RunE: commandValidatePermissionsShard,
}
// ValidatePermissionsKeyspace makes a ValidatePermissionsKeyspace gRPC call to a
// vtctld.
ValidatePermissionsKeyspace = &cobra.Command{
Use: "ValidatePermissionsKeyspace <keyspace name>",
Short: "Validates that the permissions on primary of the first shard match those of all of the other tablets in the keyspace.",
Short: "Validates that the permissions on the primary of the first shard match those of all of the other tablets in the keyspace.",
DisableFlagsInUseLine: true,
Args: cobra.ExactArgs(1),
RunE: commandValidatePermissionsKeyspace,
Expand Down
12 changes: 6 additions & 6 deletions go/cmd/vtctldclient/command/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ For --sql, semi-colons and repeated values may be mixed, for example:
Args: cobra.ExactArgs(1),
RunE: commandValidateSchemaKeyspace,
}
// ValidateSchemaShard makes a ValidateSchemaKeyspace gRPC call to a vtctld WITH
// 1 specific shard to examine in the keyspace.
// ValidateSchemaShard makes a ValidateSchemaKeyspace gRPC call to a vtctld with
// the specified shard to examine in the keyspace.
ValidateSchemaShard = &cobra.Command{
Use: "ValidateSchemaShard [--exclude-tables=<exclude_tables>] [--include-views] [--skip-no-primary] [--include-vschema] <keyspace/shard>",
Short: "Validates that the schema on the primary tablet for the specified shard matches the schema on all other tablets in that shard.",
Expand Down Expand Up @@ -190,13 +190,13 @@ var copySchemaShardOptions = struct {
}{}

func commandCopySchemaShard(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

destKeyspace, destShard, err := topoproto.ParseKeyspaceShard(cmd.Flags().Arg(1))
if err != nil {
return err
}

cli.FinishedParsing(cmd)

var sourceTabletAlias *topodatapb.TabletAlias
sourceKeyspace, sourceShard, err := topoproto.ParseKeyspaceShard(cmd.Flags().Arg(0))
if err == nil {
Expand Down Expand Up @@ -377,9 +377,9 @@ var validateSchemaKeyspaceOptions = struct {
func commandValidateSchemaKeyspace(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

ks := cmd.Flags().Arg(0)
keyspace := cmd.Flags().Arg(0)
resp, err := client.ValidateSchemaKeyspace(commandCtx, &vtctldatapb.ValidateSchemaKeyspaceRequest{
Keyspace: ks,
Keyspace: keyspace,
ExcludeTables: validateSchemaKeyspaceOptions.ExcludeTables,
IncludeVschema: validateSchemaKeyspaceOptions.IncludeVSchema,
SkipNoPrimary: validateSchemaKeyspaceOptions.SkipNoPrimary,
Expand Down
2 changes: 1 addition & 1 deletion go/cmd/vtctldclient/command/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func commandWriteTopologyPath(cmd *cobra.Command, args []string) error {
}
data, err := os.ReadFile(file)
if err != nil {
return fmt.Errorf("failed to read %s: %v", file, err)
return fmt.Errorf("failed to read file %s: %v", file, err)
}
_, err = conn.Update(cmd.Context(), path, data, nil)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion go/cmd/vtctldclient/plugin_grpctmclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ limitations under the License.

package main

// Imports and register the gRPC tabletmanager client
// Imports and registers the gRPC tabletmanager client.
// This is needed when --server=internal as the vtctldclient
// binary will then not only need to talk to the topo server
// directly but it will also need to talk to tablets directly
// via tmclient.

import (
_ "vitess.io/vitess/go/vt/vttablet/grpctmclient"
Expand Down
10 changes: 5 additions & 5 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -1292,25 +1292,25 @@ func (cluster *LocalProcessCluster) NewVttabletInstance(tabletType string, UID i
func (cluster *LocalProcessCluster) NewVTOrcProcess(config VTOrcConfiguration) *VTOrcProcess {
base := VtProcessInstance("vtorc", "vtorc", cluster.TopoProcess.Port, cluster.Hostname)
return &VTOrcProcess{
VtProcess: *base,
VtProcess: base,
LogDir: cluster.TmpDirectory,
Config: config,
Port: cluster.GetAndReservePort(),
}
}

// VtctldClientProcessInstance returns a VtctldProcess handle for vtctldclient process
// configured with the given Config.
// VtctldClientProcessInstance returns a VtctldProcess handle for a
// vtctldclient process configured with the given Config.
func (cluster *LocalProcessCluster) NewVtctldClientProcessInstance(hostname string, grpcPort int, tmpDirectory string) *VtctldClientProcess {
version, err := GetMajorVersion("vtctldclient")
if err != nil {
log.Warningf("failed to get major vtctldclient version; interop with CLI changes for VEP-4 may not work: %s", err)
log.Warningf("failed to get major vtctldclient version; interop with CLI changes for VEP-4 may not work: %v", err)
}

base := VtProcessInstance("vtctldclient", "vtctldclient", cluster.TopoProcess.Port, cluster.Hostname)

vtctldclient := &VtctldClientProcess{
VtProcess: *base,
VtProcess: base,
Server: fmt.Sprintf("%s:%d", hostname, grpcPort),
TempDirectory: tmpDirectory,
VtctldClientMajorVersion: version,
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/cluster/vt_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type VtProcess struct {

// VtProcessInstance returns a VtProcess handle configured with the given Config.
// The process must be manually started by calling setup()
func VtProcessInstance(name, binary string, topoPort int, hostname string) *VtProcess {
func VtProcessInstance(name, binary string, topoPort int, hostname string) VtProcess {
// Default values for etcd2 topo server.
topoImplementation := "etcd2"
topoRootPath := "/"
Expand All @@ -49,7 +49,7 @@ func VtProcessInstance(name, binary string, topoPort int, hostname string) *VtPr
topoRootPath = ""
}

vt := &VtProcess{
vt := VtProcess{
Name: name,
Binary: binary,
TopoImplementation: topoImplementation,
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/vtbackup_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func VtbackupProcessInstance(tabletUID int, mysqlPort int, newInitDBFile string,
cell string, hostname string, tmpDirectory string, topoPort int, initialBackup bool) *VtbackupProcess {
base := VtProcessInstance("vtbackup", "vtbackup", topoPort, hostname)
vtbackup := &VtbackupProcess{
VtProcess: *base,
VtProcess: base,
LogDir: tmpDirectory,
Directory: os.Getenv("VTDATAROOT"),
BackupStorageImplementation: "file",
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/vtctld_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (vtctld *VtctldProcess) TearDown() error {
func VtctldProcessInstance(httpPort int, grpcPort int, topoPort int, hostname string, tmpDirectory string) *VtctldProcess {
base := VtProcessInstance("vtctld", "vtctld", topoPort, hostname)
vtctld := &VtctldProcess{
VtProcess: *base,
VtProcess: base,
ServiceMap: "grpc-vtctl,grpc-vtctld",
BackupStorageImplementation: "file",
FileBackupStorageRoot: path.Join(os.Getenv("VTDATAROOT"), "/backups"),
Expand Down
9 changes: 6 additions & 3 deletions go/test/endtoend/cluster/vtctldclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func VtctldClientProcessInstance(grpcPort int, topoPort int, hostname string, tm

base := VtProcessInstance("vtctldclient", "vtctldclient", topoPort, hostname)
vtctldclient := &VtctldClientProcess{
VtProcess: *base,
VtProcess: base,
Server: fmt.Sprintf("%s:%d", hostname, grpcPort),
TempDirectory: tmpDirectory,
VtctldClientMajorVersion: version,
Expand Down Expand Up @@ -116,6 +116,8 @@ func (vtctldclient *VtctldClientProcess) ExecuteCommandWithOutput(args ...string
}

// AddCellInfo executes the vtctldclient command to add cell info.
// It uses --server=internal as there may not yet be a vtctld running
// as we need to create a cell for vtctld to use first.
func (vtctldclient *VtctldClientProcess) AddCellInfo(Cell string) error {
args := []string{
"--server", "internal",
Expand Down Expand Up @@ -340,8 +342,9 @@ func (vtctldclient *VtctldClientProcess) OnlineDDLShow(keyspace, workflow string
)
}

// shouldRetry tells us if the command should be retried based on the results/output -- meaning that it
// is likely an ephemeral or recoverable issue that is likely to succeed when retried.
// shouldRetry tells us if the command should be retried based on the results/output
// -- meaning that it is likely an ephemeral or recoverable issue that is likely to
// succeed when retried.
func shouldRetry(cmdResults string) bool {
return strings.Contains(cmdResults, "Deadlock found when trying to get lock; try restarting transaction")
}
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/vtgate_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func VtgateProcessInstance(
) *VtgateProcess {
base := VtProcessInstance("vtgate", "vtgate", topoPort, hostname)
vtgate := &VtgateProcess{
VtProcess: *base,
VtProcess: base,
FileToLogQueries: path.Join(tmpDirectory, "/vtgate_querylog.txt"),
ConfigFile: path.Join(tmpDirectory, fmt.Sprintf("vtgate-config-%d.json", port)),
Directory: os.Getenv("VTDATAROOT"),
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/vtorc_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (orc *VTOrcProcess) Setup() (err error) {
--config config/vtorc/default.json --alsologtostderr
*/
orc.proc = exec.Command(
"vtorc",
orc.Binary,
"--topo_implementation", orc.TopoImplementation,
"--topo_global_server_address", orc.TopoGlobalAddress,
"--topo_global_root", orc.TopoGlobalRoot,
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ func (vttablet *VttabletProcess) IsShutdown() bool {
func VttabletProcessInstance(port, grpcPort, tabletUID int, cell, shard, keyspace string, vtctldPort int, tabletType string, topoPort int, hostname, tmpDirectory string, extraArgs []string, charset string) *VttabletProcess {
base := VtProcessInstance("vttablet", "vttablet", topoPort, hostname)
vttablet := &VttabletProcess{
VtProcess: *base,
VtProcess: base,
FileToLogQueries: path.Join(tmpDirectory, fmt.Sprintf("/vt_%010d_querylog.txt", tabletUID)),
Directory: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d", tabletUID)),
Cell: cell,
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func ErsIgnoreTablet(clusterInstance *cluster.LocalProcessCluster, tab *cluster.
return clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(args...)
}

// ErsWithVtctldClient runs ERS via vtctldclient binary
// ErsWithVtctldClient runs ERS via a vtctldclient binary.
func ErsWithVtctldClient(clusterInstance *cluster.LocalProcessCluster) (string, error) {
args := []string{"EmergencyReparentShard", fmt.Sprintf("%s/%s", KeyspaceName, ShardName)}
return clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(args...)
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (vc *VitessCluster) StartVTOrc() error {
}
base := cluster.VtProcessInstance("vtorc", "vtorc", vc.ClusterConfig.topoPort, vc.ClusterConfig.hostname)
vtorcProcess := &cluster.VTOrcProcess{
VtProcess: *base,
VtProcess: base,
LogDir: vc.ClusterConfig.tmpDir,
Config: cluster.VTOrcConfiguration{},
Port: vc.ClusterConfig.vtorcPort,
Expand Down
20 changes: 11 additions & 9 deletions go/test/endtoend/vreplication/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ func insertInitialDataIntoExternalCluster(t *testing.T, conn *mysql.Conn) {
})
}

// TestMigrateUnsharded runs an e2e test for importing from an external cluster using the vtctldclient Mount and Migrate commands.
// We have an anti-pattern in Vitess: vt executables look for an environment variable VTDATAROOT for certain cluster parameters
// like the log directory when they are created. Until this test we just needed a single cluster for e2e tests.
// However now we need to create an external Vitess cluster. For this we need a different VTDATAROOT and
// hence the VTDATAROOT env variable gets overwritten.
// Each time we need to create vt processes in the "other" cluster we need to set the appropriate VTDATAROOT
// TestMigrateUnsharded runs an e2e test for importing from an external cluster using the
// vtctldclient Mount and Migrate commands.We have an anti-pattern in Vitess: vt executables
// look for an environment variable VTDATAROOT for certain cluster parameters like the log
// directory when they are created. Until this test we just needed a single cluster for e2e
// tests. However now we need to create an external Vitess cluster. For this we need a
// different VTDATAROOT and hence the VTDATAROOT env variable gets overwritten. Each time
// we need to create vt processes in the "other" cluster we need to set the appropriate
// VTDATAROOT.
func TestMigrateUnsharded(t *testing.T) {
vc = NewVitessCluster(t, nil)
defer vc.TearDown()
Expand Down Expand Up @@ -188,9 +190,9 @@ func TestMigrateUnsharded(t *testing.T) {
})
}

// TestMigrateSharded adds a test for a sharded cluster to validate a fix for a bug where the target keyspace name
// doesn't match that of the source cluster. The test migrates from a cluster with keyspace customer to an "external"
// cluster with keyspace rating.
// TestMigrateSharded adds a test for a sharded cluster to validate a fix for a bug where
// the target keyspace name doesn't match that of the source cluster. The test migrates
// from a cluster with keyspace customer to an "external" cluster with keyspace rating.
func TestMigrateSharded(t *testing.T) {
setSidecarDBName("_vt")
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
Expand Down
22 changes: 13 additions & 9 deletions go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,9 +886,10 @@ func (s *VtctldServer) CopySchemaShard(ctx context.Context, req *vtctldatapb.Cop
return nil, err
}

return &vtctldatapb.CopySchemaShardResponse{}, s.ws.CopySchemaShard(ctx,
req.SourceTabletAlias, req.Tables, req.ExcludeTables, req.IncludeViews, req.DestinationKeyspace, req.DestinationShard,
waitReplicasTimeout, req.SkipVerify)
err = s.ws.CopySchemaShard(ctx, req.SourceTabletAlias, req.Tables, req.ExcludeTables, req.IncludeViews,
req.DestinationKeyspace, req.DestinationShard, waitReplicasTimeout, req.SkipVerify)

return &vtctldatapb.CopySchemaShardResponse{}, err
}

// CreateKeyspace is part of the vtctlservicepb.VtctldServer interface.
Expand Down Expand Up @@ -4736,7 +4737,7 @@ func (s *VtctldServer) ValidatePermissionsKeyspace(ctx context.Context, req *vtc
}

if len(shards) == 0 {
return nil, fmt.Errorf("no shards in keyspace %v", req.Keyspace)
return nil, fmt.Errorf("no shards found in keyspace %s", req.Keyspace)
}
sort.Strings(shards)

Expand All @@ -4758,7 +4759,7 @@ func (s *VtctldServer) ValidatePermissionsKeyspace(ctx context.Context, req *vtc
}
referencePermissions := pres.Permissions

// Then diff the first tablet with all others.
// Then diff the first/reference tablet with all the others.
eg, egctx := errgroup.WithContext(ctx)
for _, shard := range shards {
eg.Go(func() error {
Expand All @@ -4770,15 +4771,16 @@ func (s *VtctldServer) ValidatePermissionsKeyspace(ctx context.Context, req *vtc
if topoproto.TabletAliasEqual(alias, si.PrimaryAlias) {
continue
}
log.Infof("Gathering permissions for %v", topoproto.TabletAliasString(alias))
log.Infof("Gathering permissions for %s", topoproto.TabletAliasString(alias))
presp, err := s.GetPermissions(ctx, &vtctldatapb.GetPermissionsRequest{
TabletAlias: alias,
})
if err != nil {
return err
}

log.Infof("Diffing permissions for %s", topoproto.TabletAliasString(alias))
log.Infof("Diffing permissions between %s and %s", topoproto.TabletAliasString(referenceAlias),
topoproto.TabletAliasString(alias))
er := &concurrency.AllErrorRecorder{}
tmutils.DiffPermissions(topoproto.TabletAliasString(referenceAlias), referencePermissions,
topoproto.TabletAliasString(alias), presp.Permissions, er)
Expand All @@ -4792,11 +4794,13 @@ func (s *VtctldServer) ValidatePermissionsKeyspace(ctx context.Context, req *vtc
if err := eg.Wait(); err != nil {
return nil, fmt.Errorf("permissions diffs: %v", err)
}

return &vtctldatapb.ValidatePermissionsKeyspaceResponse{}, nil
}

// ValidateSchemaKeyspace is a part of the vtctlservicepb.VtctldServer interface.
// It will diff the schema from all the tablets in the keyspace.
// It will diff the schema between the tablets in all shards -- or a subset if
// any specific shards are specified -- within the keyspace.
func (s *VtctldServer) ValidateSchemaKeyspace(ctx context.Context, req *vtctldatapb.ValidateSchemaKeyspaceRequest) (resp *vtctldatapb.ValidateSchemaKeyspaceResponse, err error) {
span, ctx := trace.NewSpan(ctx, "VtctldServer.ValidateSchemaKeyspace")
defer span.Finish()
Expand All @@ -4819,7 +4823,7 @@ func (s *VtctldServer) ValidateSchemaKeyspace(ctx context.Context, req *vtctldat
// Otherwise we look at all the shards in the keyspace.
shards, err = s.ts.GetShardNames(ctx, keyspace)
if err != nil {
resp.Results = append(resp.Results, fmt.Sprintf("TopologyServer.GetShardNames(%v) failed: %v", req.Keyspace, err))
resp.Results = append(resp.Results, fmt.Sprintf("TopologyServer.GetShardNames(%s) failed: %v", req.Keyspace, err))
err = nil
return resp, err
}
Expand Down

0 comments on commit c158a65

Please sign in to comment.