diff --git a/go/test/endtoend/backup/vtbackup/backup_only_test.go b/go/test/endtoend/backup/vtbackup/backup_only_test.go index 3f5389d2726..ecb04741d7b 100644 --- a/go/test/endtoend/backup/vtbackup/backup_only_test.go +++ b/go/test/endtoend/backup/vtbackup/backup_only_test.go @@ -167,7 +167,7 @@ func firstBackupTest(t *testing.T, tabletType string) { mysqlctl.CompressionEngineName = "lz4" defer func() { mysqlctl.CompressionEngineName = "pgzip" }() // now bring up the other replica, letting it restore from backup. - err = localCluster.VtctlclientProcess.InitTablet(replica2, cell, keyspaceName, hostname, shardName) + err = localCluster.InitTablet(replica2, keyspaceName, shardName) require.Nil(t, err) restore(t, replica2, "replica", "SERVING") // Replica2 takes time to serve. Sleeping for 5 sec. @@ -266,7 +266,7 @@ func removeBackups(t *testing.T) { func initTablets(t *testing.T, startTablet bool, initShardPrimary bool) { // Initialize tablets for _, tablet := range []cluster.Vttablet{*primary, *replica1} { - err := localCluster.VtctlclientProcess.InitTablet(&tablet, cell, keyspaceName, hostname, shardName) + err := localCluster.InitTablet(&tablet, keyspaceName, shardName) require.Nil(t, err) if startTablet { diff --git a/go/test/endtoend/backup/vtctlbackup/backup_utils.go b/go/test/endtoend/backup/vtctlbackup/backup_utils.go index c20ab70e652..7fc872e934c 100644 --- a/go/test/endtoend/backup/vtctlbackup/backup_utils.go +++ b/go/test/endtoend/backup/vtctlbackup/backup_utils.go @@ -228,13 +228,13 @@ func LaunchCluster(setupType int, streamMode string, stripes int, cDetails *Comp replica2 = shard.Vttablets[2] replica3 = shard.Vttablets[3] - if err := localCluster.VtctlclientProcess.InitTablet(primary, cell, keyspaceName, hostname, shard.Name); err != nil { + if err := localCluster.InitTablet(primary, keyspaceName, shard.Name); err != nil { return 1, err } - if err := localCluster.VtctlclientProcess.InitTablet(replica1, cell, keyspaceName, hostname, shard.Name); err != nil { + if err := localCluster.InitTablet(replica1, keyspaceName, shard.Name); err != nil { return 1, err } - if err := localCluster.VtctlclientProcess.InitTablet(replica2, cell, keyspaceName, hostname, shard.Name); err != nil { + if err := localCluster.InitTablet(replica2, keyspaceName, shard.Name); err != nil { return 1, err } vtctldClientProcess := cluster.VtctldClientProcessInstance("localhost", localCluster.VtctldProcess.GrpcPort, localCluster.TmpDirectory) @@ -449,7 +449,7 @@ func primaryBackup(t *testing.T) { }() verifyInitialReplication(t) - output, err := localCluster.VtctlclientProcess.ExecuteCommandWithOutput("Backup", primary.Alias) + output, err := localCluster.VtctldClientProcess.ExecuteCommandWithOutput("Backup", primary.Alias) require.Error(t, err) assert.Contains(t, output, "type PRIMARY cannot take backup. if you really need to do this, rerun the backup command with --allow_primary") @@ -746,7 +746,7 @@ func restartPrimaryAndReplica(t *testing.T) { proc.Wait() } for _, tablet := range []*cluster.Vttablet{primary, replica1} { - err := localCluster.VtctlclientProcess.InitTablet(tablet, cell, keyspaceName, hostname, shardName) + err := localCluster.InitTablet(tablet, keyspaceName, shardName) require.Nil(t, err) err = tablet.VttabletProcess.Setup() require.Nil(t, err) diff --git a/go/test/endtoend/cluster/cluster_process.go b/go/test/endtoend/cluster/cluster_process.go index 98218bcf3fb..0233b2ac1ea 100644 --- a/go/test/endtoend/cluster/cluster_process.go +++ b/go/test/endtoend/cluster/cluster_process.go @@ -38,7 +38,6 @@ import ( "time" "vitess.io/vitess/go/constants/sidecar" - "vitess.io/vitess/go/json2" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/syscallutil" @@ -319,6 +318,44 @@ func (cluster *LocalProcessCluster) StartKeyspace(keyspace Keyspace, shardNames return nil } +// InitTablet initializes a tablet record in the topo server. It does not start the tablet process. +func (cluster *LocalProcessCluster) InitTablet(tablet *Vttablet, keyspace string, shard string) error { + tabletpb := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: tablet.Cell, + Uid: uint32(tablet.TabletUID), + }, + Hostname: cluster.Hostname, + Type: topodatapb.TabletType_REPLICA, + PortMap: map[string]int32{ + "vt": int32(tablet.HTTPPort), + }, + Keyspace: keyspace, + Shard: shard, + } + + switch tablet.Type { + case "rdonly": + tabletpb.Type = topodatapb.TabletType_RDONLY + case "primary": + tabletpb.Type = topodatapb.TabletType_PRIMARY + } + + if tablet.MySQLPort > 0 { + tabletpb.PortMap["mysql"] = int32(tablet.MySQLPort) + } + + if tablet.GrpcPort > 0 { + tabletpb.PortMap["grpc"] = int32(tablet.GrpcPort) + } + + allowPrimaryOverride := false + createShardAndKeyspace := true + allowUpdate := true + + return cluster.TopoProcess.Server.InitTablet(context.Background(), tabletpb, allowPrimaryOverride, createShardAndKeyspace, allowUpdate) +} + // StartKeyspace starts required number of shard and the corresponding tablets // keyspace : struct containing keyspace name, Sqlschema to apply, VSchema to apply // shardName : list of shard names @@ -856,7 +893,7 @@ func (cluster *LocalProcessCluster) ExecOnTablet(ctx context.Context, vttablet * return nil, err } - tablet, err := cluster.VtctlclientGetTablet(vttablet) + tablet, err := cluster.VtctldClientProcess.GetTablet(vttablet.Alias) if err != nil { return nil, err } @@ -899,7 +936,7 @@ func (cluster *LocalProcessCluster) ExecOnVTGate(ctx context.Context, addr strin // returns the responses. It returns an error if the stream ends with fewer than // `count` responses. func (cluster *LocalProcessCluster) StreamTabletHealth(ctx context.Context, vttablet *Vttablet, count int) (responses []*querypb.StreamHealthResponse, err error) { - tablet, err := cluster.VtctlclientGetTablet(vttablet) + tablet, err := cluster.VtctldClientProcess.GetTablet(vttablet.Alias) if err != nil { return nil, err } @@ -934,7 +971,7 @@ func (cluster *LocalProcessCluster) StreamTabletHealth(ctx context.Context, vtta // StreamTabletHealthUntil invokes a HealthStream on a local cluster Vttablet and // returns the responses. It waits until a certain condition is met. The amount of time to wait is an input that it takes. func (cluster *LocalProcessCluster) StreamTabletHealthUntil(ctx context.Context, vttablet *Vttablet, timeout time.Duration, condition func(shr *querypb.StreamHealthResponse) bool) error { - tablet, err := cluster.VtctlclientGetTablet(vttablet) + tablet, err := cluster.VtctldClientProcess.GetTablet(vttablet.Alias) if err != nil { return err } @@ -971,25 +1008,6 @@ func (cluster *LocalProcessCluster) StreamTabletHealthUntil(ctx context.Context, return err } -func (cluster *LocalProcessCluster) VtctlclientGetTablet(tablet *Vttablet) (*topodatapb.Tablet, error) { - result, err := cluster.VtctlclientProcess.ExecuteCommandWithOutput("GetTablet", "--", tablet.Alias) - if err != nil { - return nil, err - } - - var ti topodatapb.Tablet - if err := json2.Unmarshal([]byte(result), &ti); err != nil { - return nil, err - } - - return &ti, nil -} - -func (cluster *LocalProcessCluster) VtctlclientChangeTabletType(tablet *Vttablet, tabletType topodatapb.TabletType) error { - _, err := cluster.VtctlclientProcess.ExecuteCommandWithOutput("ChangeTabletType", "--", tablet.Alias, tabletType.String()) - return err -} - // Teardown brings down the cluster by invoking teardown for individual processes func (cluster *LocalProcessCluster) Teardown() { PanicHandler(nil) diff --git a/go/test/endtoend/cluster/cluster_util.go b/go/test/endtoend/cluster/cluster_util.go index 9fcefba3892..5d7869a421e 100644 --- a/go/test/endtoend/cluster/cluster_util.go +++ b/go/test/endtoend/cluster/cluster_util.go @@ -36,7 +36,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "vitess.io/vitess/go/json2" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/vtgate/vtgateconn" @@ -360,7 +359,11 @@ func GetPasswordUpdateSQL(localCluster *LocalProcessCluster) string { // CheckSrvKeyspace confirms that the cell and keyspace contain the expected // shard mappings. func CheckSrvKeyspace(t *testing.T, cell string, ksname string, expectedPartition map[topodatapb.TabletType][]string, ci LocalProcessCluster) { - srvKeyspace := GetSrvKeyspace(t, cell, ksname, ci) + srvKeyspaces, err := ci.VtctldClientProcess.GetSrvKeyspaces(ksname, cell) + require.NoError(t, err) + + srvKeyspace := srvKeyspaces[cell] + require.NotNil(t, srvKeyspace, "srvKeyspace is nil for %s", cell) currentPartition := map[topodatapb.TabletType][]string{} @@ -374,17 +377,6 @@ func CheckSrvKeyspace(t *testing.T, cell string, ksname string, expectedPartitio assert.True(t, reflect.DeepEqual(currentPartition, expectedPartition)) } -// GetSrvKeyspace returns the SrvKeyspace structure for the cell and keyspace. -func GetSrvKeyspace(t *testing.T, cell string, ksname string, ci LocalProcessCluster) *topodatapb.SrvKeyspace { - output, err := ci.VtctlclientProcess.ExecuteCommandWithOutput("GetSrvKeyspace", cell, ksname) - require.Nil(t, err) - var srvKeyspace topodatapb.SrvKeyspace - - err = json2.Unmarshal([]byte(output), &srvKeyspace) - require.Nil(t, err) - return &srvKeyspace -} - // ExecuteOnTablet executes a query on the specified vttablet. // It should always be called with a primary tablet for a keyspace/shard. func ExecuteOnTablet(t *testing.T, query string, vttablet Vttablet, ks string, expectFail bool) { diff --git a/go/test/endtoend/cluster/topo_process.go b/go/test/endtoend/cluster/topo_process.go index 776ed7da27e..d5d5c8482a0 100644 --- a/go/test/endtoend/cluster/topo_process.go +++ b/go/test/endtoend/cluster/topo_process.go @@ -33,6 +33,11 @@ import ( "vitess.io/vitess/go/vt/log" vtopo "vitess.io/vitess/go/vt/topo" + + // Register topo server implementations + _ "vitess.io/vitess/go/vt/topo/consultopo" + _ "vitess.io/vitess/go/vt/topo/etcd2topo" + _ "vitess.io/vitess/go/vt/topo/zk2topo" ) // TopoProcess is a generic handle for a running Topo service . @@ -51,6 +56,7 @@ type TopoProcess struct { PeerURL string ZKPorts string Client interface{} + Server *vtopo.Server proc *exec.Cmd exit chan error @@ -60,15 +66,22 @@ type TopoProcess struct { func (topo *TopoProcess) Setup(topoFlavor string, cluster *LocalProcessCluster) (err error) { switch topoFlavor { case "zk2": - return topo.SetupZookeeper(cluster) + err = topo.SetupZookeeper(cluster) case "consul": - return topo.SetupConsul(cluster) + err = topo.SetupConsul(cluster) default: // Override any inherited ETCDCTL_API env value to // ensure that we use the v3 API and storage. os.Setenv("ETCDCTL_API", "3") - return topo.SetupEtcd() + err = topo.SetupEtcd() } + + if err != nil { + return + } + + topo.Server, err = vtopo.OpenServer(topoFlavor, net.JoinHostPort(topo.Host, fmt.Sprintf("%d", topo.Port)), TopoGlobalRoot(topoFlavor)) + return } // SetupEtcd spawns a new etcd service and initializes it with the defaults. @@ -289,6 +302,11 @@ func (topo *TopoProcess) SetupConsul(cluster *LocalProcessCluster) (err error) { // TearDown shutdowns the running topo service. func (topo *TopoProcess) TearDown(Cell string, originalVtRoot string, currentRoot string, keepdata bool, topoFlavor string) error { + if topo.Server != nil { + topo.Server.Close() + topo.Server = nil + } + if topo.Client != nil { switch cli := topo.Client.(type) { case *clientv3.Client: @@ -437,3 +455,13 @@ func TopoProcessInstance(port int, peerPort int, hostname string, flavor string, topo.PeerURL = fmt.Sprintf("http://%s:%d", hostname, peerPort) return topo } + +// TopoGlobalRoot returns the global root for the given topo flavor. +func TopoGlobalRoot(flavor string) string { + switch flavor { + case "consul": + return "global" + default: + return "/vitess/global" + } +} diff --git a/go/test/endtoend/cluster/vtctl_process.go b/go/test/endtoend/cluster/vtctl_process.go index 9b3d1a5f4e1..b9d8a5b46ce 100644 --- a/go/test/endtoend/cluster/vtctl_process.go +++ b/go/test/endtoend/cluster/vtctl_process.go @@ -118,7 +118,6 @@ func VtctlProcessInstance(topoPort int, hostname string) *VtctlProcess { // Default values for etcd2 topo server. topoImplementation := "etcd2" - topoGlobalRoot := "/vitess/global" topoRootPath := "/" // Checking and resetting the parameters for required topo server. @@ -127,7 +126,6 @@ func VtctlProcessInstance(topoPort int, hostname string) *VtctlProcess { topoImplementation = "zk2" case "consul": topoImplementation = "consul" - topoGlobalRoot = "global" // For consul we do not need "/" in the path topoRootPath = "" } @@ -142,7 +140,7 @@ func VtctlProcessInstance(topoPort int, hostname string) *VtctlProcess { Binary: "vtctl", TopoImplementation: topoImplementation, TopoGlobalAddress: fmt.Sprintf("%s:%d", hostname, topoPort), - TopoGlobalRoot: topoGlobalRoot, + TopoGlobalRoot: TopoGlobalRoot(*topoFlavor), TopoServerAddress: fmt.Sprintf("%s:%d", hostname, topoPort), TopoRootPath: topoRootPath, VtctlMajorVersion: version, diff --git a/go/test/endtoend/cluster/vtctldclient_process.go b/go/test/endtoend/cluster/vtctldclient_process.go index c5afd8f1220..57cb0cc4f45 100644 --- a/go/test/endtoend/cluster/vtctldclient_process.go +++ b/go/test/endtoend/cluster/vtctldclient_process.go @@ -152,6 +152,28 @@ func (vtctldclient *VtctldClientProcess) ApplyVSchema(keyspace string, json stri ) } +// ChangeTabletType changes the type of the given tablet. +func (vtctldclient *VtctldClientProcess) ChangeTabletType(tablet *Vttablet, tabletType topodatapb.TabletType) error { + return vtctldclient.ExecuteCommand( + "ChangeTabletType", + tablet.Alias, + tabletType.String(), + ) +} + +// GetShardReplication returns a mapping of cell to shard replication for the given keyspace and shard. +func (vtctldclient *VtctldClientProcess) GetShardReplication(keyspace string, shard string, cells ...string) (map[string]*topodatapb.ShardReplication, error) { + args := append([]string{"GetShardReplication", keyspace + "/" + shard}, cells...) + out, err := vtctldclient.ExecuteCommandWithOutput(args...) + if err != nil { + return nil, err + } + + var resp vtctldatapb.GetShardReplicationResponse + err = json2.Unmarshal([]byte(out), &resp) + return resp.ShardReplicationByCell, err +} + // GetSrvKeyspaces returns a mapping of cell to srv keyspace for the given keyspace. func (vtctldclient *VtctldClientProcess) GetSrvKeyspaces(keyspace string, cells ...string) (ksMap map[string]*topodatapb.SrvKeyspace, err error) { args := append([]string{"GetSrvKeyspaces", keyspace}, cells...) diff --git a/go/test/endtoend/clustertest/vtctld_test.go b/go/test/endtoend/clustertest/vtctld_test.go index c1b341ccd73..bb1bcdf2237 100644 --- a/go/test/endtoend/clustertest/vtctld_test.go +++ b/go/test/endtoend/clustertest/vtctld_test.go @@ -84,7 +84,7 @@ func testTopoDataAPI(t *testing.T, url string) { func testListAllTablets(t *testing.T) { // first w/o any filters, aside from cell - result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ListAllTablets", clusterInstance.Cell) + result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("GetTablets", "--cell", clusterInstance.Cell) require.NoError(t, err) tablets := getAllTablets() @@ -102,10 +102,12 @@ func testListAllTablets(t *testing.T) { // now filtering with the first keyspace and tablet type of primary, in // addition to the cell - result, err = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput( - "ListAllTablets", "--", "--keyspace", clusterInstance.Keyspaces[0].Name, - "--tablet_type", "primary", - clusterInstance.Cell) + result, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput( + "GetTablets", + "--keyspace", clusterInstance.Keyspaces[0].Name, + "--tablet-type", "primary", + "--cell", clusterInstance.Cell, + ) require.NoError(t, err) // We should only return a single primary tablet per shard in the first keyspace @@ -164,7 +166,7 @@ func testExecuteAsDba(t *testing.T) { } for _, tcase := range tcases { t.Run(tcase.query, func(t *testing.T) { - result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDba", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, tcase.query) + result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, tcase.query) if tcase.expectErr { assert.Error(t, err) } else { @@ -176,7 +178,7 @@ func testExecuteAsDba(t *testing.T) { } func testExecuteAsApp(t *testing.T) { - result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ExecuteFetchAsApp", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, `SELECT 1 AS a`) + result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsApp", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, `SELECT 1 AS a`) require.NoError(t, err) assert.Equal(t, result, oneTableOutput) } diff --git a/go/test/endtoend/keyspace/keyspace_test.go b/go/test/endtoend/keyspace/keyspace_test.go index 7f7d4198135..2a665c66214 100644 --- a/go/test/endtoend/keyspace/keyspace_test.go +++ b/go/test/endtoend/keyspace/keyspace_test.go @@ -18,7 +18,6 @@ package sequence import ( "encoding/binary" - "encoding/json" "flag" "os" "testing" @@ -211,12 +210,9 @@ func TestGetSrvKeyspacePartitions(t *testing.T) { func TestShardNames(t *testing.T) { defer cluster.PanicHandler(t) - output, err := clusterForKSTest.VtctlclientProcess.ExecuteCommandWithOutput("GetSrvKeyspace", cell, keyspaceShardedName) - require.Nil(t, err) - var srvKeyspace topodatapb.SrvKeyspace - - err = json.Unmarshal([]byte(output), &srvKeyspace) - require.Nil(t, err) + output, err := clusterForKSTest.VtctldClientProcess.GetSrvKeyspaces(keyspaceShardedName, cell) + require.NoError(t, err) + require.NotNil(t, output[cell], "no srvkeyspace for cell %s", cell) } func TestGetKeyspace(t *testing.T) { @@ -229,7 +225,11 @@ func TestDeleteKeyspace(t *testing.T) { defer cluster.PanicHandler(t) _ = clusterForKSTest.VtctldClientProcess.CreateKeyspace("test_delete_keyspace", sidecar.DefaultName) _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("CreateShard", "test_delete_keyspace/0") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("InitTablet", "--", "--keyspace=test_delete_keyspace", "--shard=0", "zone1-0000000100", "primary") + _ = clusterForKSTest.InitTablet(&cluster.Vttablet{ + Type: "primary", + TabletUID: 100, + Cell: "zone1", + }, "test_delete_keyspace", "0") // Can't delete keyspace if there are shards present. err := clusterForKSTest.VtctldClientProcess.ExecuteCommand("DeleteKeyspace", "test_delete_keyspace") @@ -247,13 +247,18 @@ func TestDeleteKeyspace(t *testing.T) { // Start over and this time use recursive DeleteKeyspace to do everything. _ = clusterForKSTest.VtctldClientProcess.CreateKeyspace("test_delete_keyspace", sidecar.DefaultName) _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("CreateShard", "test_delete_keyspace/0") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("InitTablet", "--", "--port=1234", "--bind-address=127.0.0.1", "--keyspace=test_delete_keyspace", "--shard=0", "zone1-0000000100", "primary") + _ = clusterForKSTest.InitTablet(&cluster.Vttablet{ + Type: "primary", + TabletUID: 100, + Cell: "zone1", + HTTPPort: 1234, + }, "test_delete_keyspace", "0") // Create the serving/replication entries and check that they exist, // so we can later check they're deleted. _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("RebuildKeyspaceGraph", "test_delete_keyspace") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("GetShardReplication", cell, "test_delete_keyspace/0") - _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetSrvKeyspaces", "test_delete_keyspace", cell) + _, _ = clusterForKSTest.VtctldClientProcess.GetShardReplication("test_delete_keyspace", "0", cell) + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetSrvKeyspace", cell, "test_delete_keyspace") // Recursive DeleteKeyspace _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("DeleteKeyspace", "--recursive", "test_delete_keyspace") @@ -265,7 +270,9 @@ func TestDeleteKeyspace(t *testing.T) { require.Error(t, err) err = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetTablet", "zone1-0000000100") require.Error(t, err) - err = clusterForKSTest.VtctlclientProcess.ExecuteCommand("GetShardReplication", cell, "test_delete_keyspace/0") + _, err = clusterForKSTest.VtctldClientProcess.GetShardReplication("test_delete_keyspace", "0", cell) + require.Error(t, err) + err = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetSrvKeyspace", cell, "test_delete_keyspace") require.Error(t, err) ksMap, err := clusterForKSTest.VtctldClientProcess.GetSrvKeyspaces("test_delete_keyspace", cell) require.NoError(t, err) @@ -419,11 +426,11 @@ func packKeyspaceID(keyspaceID uint64) []byte { } func getSrvKeyspace(t *testing.T, cell string, ksname string) *topodatapb.SrvKeyspace { - output, err := clusterForKSTest.VtctlclientProcess.ExecuteCommandWithOutput("GetSrvKeyspace", cell, ksname) - require.Nil(t, err) - var srvKeyspace topodatapb.SrvKeyspace + output, err := clusterForKSTest.VtctldClientProcess.GetSrvKeyspaces(ksname, cell) + require.NoError(t, err) - err = json.Unmarshal([]byte(output), &srvKeyspace) - require.Nil(t, err) - return &srvKeyspace + srvKeyspace := output[cell] + require.NotNil(t, srvKeyspace, "no srvkeyspace for cell %s", cell) + + return srvKeyspace } diff --git a/go/test/endtoend/recovery/pitr/shardedpitr_test.go b/go/test/endtoend/recovery/pitr/shardedpitr_test.go index 0aed6573337..03fcf76b07c 100644 --- a/go/test/endtoend/recovery/pitr/shardedpitr_test.go +++ b/go/test/endtoend/recovery/pitr/shardedpitr_test.go @@ -22,18 +22,21 @@ import ( "os" "os/exec" "path" + "strings" "testing" "time" - "github.com/buger/jsonparser" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "vitess.io/vitess/go/constants/sidecar" + "vitess.io/vitess/go/json2" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/utils" "vitess.io/vitess/go/vt/log" + + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) var ( @@ -305,7 +308,7 @@ func performResharding(t *testing.T) { shard0Primary.VttabletProcess.WaitForVReplicationToCatchup(t, "ks.reshardWorkflow", dbName, sidecar.DefaultName, waitTimeout) shard1Primary.VttabletProcess.WaitForVReplicationToCatchup(t, "ks.reshardWorkflow", dbName, sidecar.DefaultName, waitTimeout) - waitForNoWorkflowLag(t, clusterInstance, "ks.reshardWorkflow") + waitForNoWorkflowLag(t, clusterInstance, "ks", "reshardWorkflow") err = clusterInstance.VtctldClientProcess.ExecuteCommand("Reshard", "SwitchTraffic", "--tablet-types=rdonly", "--target-keyspace", "ks", "--workflow", "reshardWorkflow") require.NoError(t, err) @@ -573,22 +576,26 @@ func launchRecoveryTablet(t *testing.T, tablet *cluster.Vttablet, binlogServer * // waitForNoWorkflowLag waits for the VReplication workflow's MaxVReplicationTransactionLag // value to be 0. -func waitForNoWorkflowLag(t *testing.T, vc *cluster.LocalProcessCluster, ksWorkflow string) { - lag := int64(0) +func waitForNoWorkflowLag(t *testing.T, vc *cluster.LocalProcessCluster, ks string, workflow string) { + var lag int64 timer := time.NewTimer(defaultTimeout) defer timer.Stop() for { - output, err := vc.VtctlclientProcess.ExecuteCommandWithOutput("Workflow", "--", ksWorkflow, "show") + output, err := vc.VtctldClientProcess.ExecuteCommandWithOutput("Workflow", "--keyspace", ks, "show", "--workflow", workflow) require.NoError(t, err) - lag, err = jsonparser.GetInt([]byte(output), "MaxVReplicationTransactionLag") + + var resp vtctldatapb.GetWorkflowsResponse + err = json2.Unmarshal([]byte(output), &resp) require.NoError(t, err) + require.GreaterOrEqual(t, len(resp.Workflows), 1, "responce should have at least one workflow") + lag = resp.Workflows[0].MaxVReplicationTransactionLag if lag == 0 { return } select { case <-timer.C: require.FailNow(t, fmt.Sprintf("workflow %q did not eliminate VReplication lag before the timeout of %s; last seen MaxVReplicationTransactionLag: %d", - ksWorkflow, defaultTimeout, lag)) + strings.Join([]string{ks, workflow}, "."), defaultTimeout, lag)) default: time.Sleep(defaultTick) } diff --git a/go/test/endtoend/reparent/utils/utils.go b/go/test/endtoend/reparent/utils/utils.go index 790fd0028e2..fb782e69ea4 100644 --- a/go/test/endtoend/reparent/utils/utils.go +++ b/go/test/endtoend/reparent/utils/utils.go @@ -510,7 +510,7 @@ func RestartTablet(t *testing.T, clusterInstance *cluster.LocalProcessCluster, t tab.MysqlctlProcess.InitMysql = false err := tab.MysqlctlProcess.Start() require.NoError(t, err) - err = clusterInstance.VtctlclientProcess.InitTablet(tab, tab.Cell, KeyspaceName, Hostname, ShardName) + err = clusterInstance.InitTablet(tab, KeyspaceName, ShardName) require.NoError(t, err) } @@ -519,7 +519,7 @@ func ResurrectTablet(ctx context.Context, t *testing.T, clusterInstance *cluster tab.MysqlctlProcess.InitMysql = false err := tab.MysqlctlProcess.Start() require.NoError(t, err) - err = clusterInstance.VtctlclientProcess.InitTablet(tab, tab.Cell, KeyspaceName, Hostname, ShardName) + err = clusterInstance.InitTablet(tab, KeyspaceName, ShardName) require.NoError(t, err) // As there is already a primary the new replica will come directly in SERVING state @@ -561,7 +561,7 @@ func GetNewPrimary(t *testing.T, clusterInstance *cluster.LocalProcessCluster) * // GetShardReplicationPositions gets the shards replication positions. // This should not generally be called directly, instead use the WaitForReplicationToCatchup method. func GetShardReplicationPositions(t *testing.T, clusterInstance *cluster.LocalProcessCluster, keyspaceName, shardName string, doPrint bool) []string { - output, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput( + output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput( "ShardReplicationPositions", fmt.Sprintf("%s/%s", keyspaceName, shardName)) require.NoError(t, err) strArray := strings.Split(output, "\n") @@ -608,12 +608,23 @@ func CheckReplicaStatus(ctx context.Context, t *testing.T, tablet *cluster.Vttab // CheckReparentFromOutside checks that cluster was reparented from outside func CheckReparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProcessCluster, tablet *cluster.Vttablet, downPrimary bool, baseTime int64) { - result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("GetShardReplication", cell1, KeyspaceShard) - require.Nil(t, err, "error should be Nil") - if !downPrimary { - assertNodeCount(t, result, int(3)) + if clusterInstance.VtctlMajorVersion > 19 { // TODO: (ajm188) remove else clause after next release + result, err := clusterInstance.VtctldClientProcess.GetShardReplication(KeyspaceName, ShardName, cell1) + require.Nil(t, err, "error should be Nil") + require.NotNil(t, result[cell1], "result should not be nil") + if !downPrimary { + assert.Len(t, result[cell1].Nodes, 3) + } else { + assert.Len(t, result[cell1].Nodes, 2) + } } else { - assertNodeCount(t, result, int(2)) + result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("GetShardReplication", cell1, KeyspaceShard) + require.Nil(t, err, "error should be Nil") + if !downPrimary { + assertNodeCount(t, result, int(3)) + } else { + assertNodeCount(t, result, int(2)) + } } // make sure the primary status page says it's the primary @@ -622,7 +633,7 @@ func CheckReparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProces // make sure the primary health stream says it's the primary too // (health check is disabled on these servers, force it first) - err = clusterInstance.VtctldClientProcess.ExecuteCommand("RunHealthCheck", tablet.Alias) + err := clusterInstance.VtctldClientProcess.ExecuteCommand("RunHealthCheck", tablet.Alias) require.NoError(t, err) shrs, err := clusterInstance.StreamTabletHealth(context.Background(), tablet, 1) @@ -633,6 +644,16 @@ func CheckReparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProces assert.True(t, streamHealthResponse.PrimaryTermStartTimestamp >= baseTime) } +func assertNodeCount(t *testing.T, result string, want int) { + resultMap := make(map[string]any) + err := json.Unmarshal([]byte(result), &resultMap) + require.NoError(t, err) + + nodes := reflect.ValueOf(resultMap["nodes"]) + got := nodes.Len() + assert.Equal(t, want, got) +} + // WaitForReplicationPosition waits for tablet B to catch up to the replication position of tablet A. func WaitForReplicationPosition(t *testing.T, tabletA *cluster.Vttablet, tabletB *cluster.Vttablet) error { posA, _ := cluster.GetPrimaryPosition(t, *tabletA, Hostname) @@ -658,16 +679,6 @@ func positionAtLeast(t *testing.T, tablet *cluster.Vttablet, a string, b string) return isAtleast } -func assertNodeCount(t *testing.T, result string, want int) { - resultMap := make(map[string]any) - err := json.Unmarshal([]byte(result), &resultMap) - require.NoError(t, err) - - nodes := reflect.ValueOf(resultMap["nodes"]) - got := nodes.Len() - assert.Equal(t, want, got) -} - // CheckDBvar checks the db var func CheckDBvar(ctx context.Context, t *testing.T, tablet *cluster.Vttablet, variable string, status string) { tabletParams := getMysqlConnParam(tablet) @@ -718,7 +729,7 @@ func CheckReplicationStatus(ctx context.Context, t *testing.T, tablet *cluster.V } func WaitForTabletToBeServing(t *testing.T, clusterInstance *cluster.LocalProcessCluster, tablet *cluster.Vttablet, timeout time.Duration) { - vTablet, err := clusterInstance.VtctlclientGetTablet(tablet) + vTablet, err := clusterInstance.VtctldClientProcess.GetTablet(tablet.Alias) require.NoError(t, err) tConn, err := tabletconn.GetDialer()(vTablet, false) diff --git a/go/test/endtoend/tabletgateway/buffer/reshard/sharded_buffer_test.go b/go/test/endtoend/tabletgateway/buffer/reshard/sharded_buffer_test.go index d58d8901165..5e439cc9fff 100644 --- a/go/test/endtoend/tabletgateway/buffer/reshard/sharded_buffer_test.go +++ b/go/test/endtoend/tabletgateway/buffer/reshard/sharded_buffer_test.go @@ -21,16 +21,15 @@ import ( "testing" "time" - "github.com/buger/jsonparser" - - "vitess.io/vitess/go/vt/log" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/json2" + "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/tabletgateway/buffer" + "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/test/endtoend/cluster" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) const ( @@ -43,11 +42,16 @@ func waitForLowLag(t *testing.T, clusterInstance *cluster.LocalProcessCluster, k waitDuration := 500 * time.Millisecond duration := maxWait for duration > 0 { - output, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("Workflow", fmt.Sprintf("%s.%s", keyspace, workflow), "Show") + output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("Workflow", "--keyspace", keyspace, "show", "--workflow", workflow) require.NoError(t, err) - lagSeconds, err = jsonparser.GetInt([]byte(output), "MaxVReplicationTransactionLag") + var resp vtctldatapb.GetWorkflowsResponse + err = json2.Unmarshal([]byte(output), &resp) require.NoError(t, err) + require.GreaterOrEqual(t, len(resp.Workflows), 1, "responce should have at least one workflow") + lagSeconds := resp.Workflows[0].MaxVReplicationTransactionLag + + require.NoError(t, err, output) if lagSeconds <= acceptableLagSeconds { log.Infof("waitForLowLag acceptable for workflow %s, keyspace %s, current lag is %d", workflow, keyspace, lagSeconds) break diff --git a/go/test/endtoend/tabletgateway/vtgate_test.go b/go/test/endtoend/tabletgateway/vtgate_test.go index d9cedc04b69..de4546d5d0d 100644 --- a/go/test/endtoend/tabletgateway/vtgate_test.go +++ b/go/test/endtoend/tabletgateway/vtgate_test.go @@ -120,11 +120,11 @@ func TestVtgateReplicationStatusCheckWithTabletTypeChange(t *testing.T) { // change the RDONLY tablet to SPARE rdOnlyTablet := clusterInstance.Keyspaces[0].Shards[0].Rdonly() - err = clusterInstance.VtctlclientChangeTabletType(rdOnlyTablet, topodata.TabletType_SPARE) + err = clusterInstance.VtctldClientProcess.ChangeTabletType(rdOnlyTablet, topodata.TabletType_SPARE) require.NoError(t, err) // Change it back to RDONLY afterward as the cluster is re-used. defer func() { - err = clusterInstance.VtctlclientChangeTabletType(rdOnlyTablet, topodata.TabletType_RDONLY) + err = clusterInstance.VtctldClientProcess.ExecuteCommand("ChangeTabletType", rdOnlyTablet.Alias, "rdonly") require.NoError(t, err) }() diff --git a/go/test/endtoend/tabletmanager/commands_test.go b/go/test/endtoend/tabletmanager/commands_test.go index d23413e0269..ca0b3b15818 100644 --- a/go/test/endtoend/tabletmanager/commands_test.go +++ b/go/test/endtoend/tabletmanager/commands_test.go @@ -23,15 +23,16 @@ import ( "reflect" "testing" - "vitess.io/vitess/go/test/endtoend/utils" - + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tidwall/gjson" - "github.com/stretchr/testify/assert" - + "vitess.io/vitess/go/json2" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/utils" + + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) var ( @@ -143,44 +144,39 @@ func TestHook(t *testing.T) { // test a regular program works defer cluster.PanicHandler(t) runHookAndAssert(t, []string{ - "ExecuteHook", "--", primaryTablet.Alias, "test.sh", "--flag1", "--param1=hello"}, "0", false, "") + "ExecuteHook", primaryTablet.Alias, "test.sh", "--", "--flag1", "--param1=hello"}, 0, false, "") // test stderr output runHookAndAssert(t, []string{ - "ExecuteHook", "--", primaryTablet.Alias, "test.sh", "--to-stderr"}, "0", false, "ERR: --to-stderr\n") + "ExecuteHook", primaryTablet.Alias, "test.sh", "--", "--to-stderr"}, 0, false, "ERR: --to-stderr\n") // test commands that fail runHookAndAssert(t, []string{ - "ExecuteHook", "--", primaryTablet.Alias, "test.sh", "--exit-error"}, "1", false, "ERROR: exit status 1\n") + "ExecuteHook", primaryTablet.Alias, "test.sh", "--", "--exit-error"}, 1, false, "ERROR: exit status 1\n") // test hook that is not present runHookAndAssert(t, []string{ - "ExecuteHook", "--", primaryTablet.Alias, "not_here.sh", "--exit-error"}, "-1", false, "missing hook") + "ExecuteHook", primaryTablet.Alias, "not_here.sh", "--", "--exit-error"}, -1, false, "missing hook") // test hook with invalid name runHookAndAssert(t, []string{ - "ExecuteHook", "--", primaryTablet.Alias, "/bin/ls"}, "-1", true, "hook name cannot have") + "ExecuteHook", primaryTablet.Alias, "/bin/ls"}, -1, true, "hook name cannot have") } -func runHookAndAssert(t *testing.T, params []string, expectedStatus string, expectedError bool, expectedStderr string) { - - hr, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput(params...) +func runHookAndAssert(t *testing.T, params []string, expectedStatus int64, expectedError bool, expectedStderr string) { + hr, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(params...) if expectedError { assert.Error(t, err, "Expected error") } else { require.Nil(t, err) - resultMap := make(map[string]any) - err = json.Unmarshal([]byte(hr), &resultMap) + var resp vtctldatapb.ExecuteHookResponse + err = json2.Unmarshal([]byte(hr), &resp) require.Nil(t, err) - exitStatus := reflect.ValueOf(resultMap["ExitStatus"]).Float() - status := fmt.Sprintf("%.0f", exitStatus) - assert.Equal(t, expectedStatus, status) - - stderr := reflect.ValueOf(resultMap["Stderr"]).String() - assert.Contains(t, stderr, expectedStderr) + assert.Equal(t, expectedStatus, resp.HookResult.ExitStatus) + assert.Contains(t, resp.HookResult.Stderr, expectedStderr) } } @@ -188,23 +184,26 @@ func runHookAndAssert(t *testing.T, params []string, expectedStatus string, expe func TestShardReplicationFix(t *testing.T) { // make sure the replica is in the replication graph, 2 nodes: 1 primary, 1 replica defer cluster.PanicHandler(t) - result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("GetShardReplication", cell, keyspaceShard) + result, err := clusterInstance.VtctldClientProcess.GetShardReplication(keyspaceName, shardName, cell) require.Nil(t, err, "error should be Nil") - assertNodeCount(t, result, int(3)) + require.NotNil(t, result[cell], "result should not be Nil") + assert.Len(t, result[cell].Nodes, 3) // Manually add a bogus entry to the replication graph, and check it is removed by ShardReplicationFix err = clusterInstance.VtctldClientProcess.ExecuteCommand("ShardReplicationAdd", keyspaceShard, fmt.Sprintf("%s-9000", cell)) require.Nil(t, err, "error should be Nil") - result, err = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("GetShardReplication", cell, keyspaceShard) + result, err = clusterInstance.VtctldClientProcess.GetShardReplication(keyspaceName, shardName, cell) require.Nil(t, err, "error should be Nil") - assertNodeCount(t, result, int(4)) + require.NotNil(t, result[cell], "result should not be Nil") + assert.Len(t, result[cell].Nodes, 4) err = clusterInstance.VtctldClientProcess.ExecuteCommand("ShardReplicationFix", cell, keyspaceShard) require.Nil(t, err, "error should be Nil") - result, err = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("GetShardReplication", cell, keyspaceShard) + result, err = clusterInstance.VtctldClientProcess.GetShardReplication(keyspaceName, shardName, cell) require.Nil(t, err, "error should be Nil") - assertNodeCount(t, result, int(3)) + require.NotNil(t, result[cell], "result should not be Nil") + assert.Len(t, result[cell].Nodes, 3) } func TestGetSchema(t *testing.T) { @@ -220,13 +219,3 @@ func TestGetSchema(t *testing.T) { v1Create := gjson.Get(res, "table_definitions.#(name==\"v1\").schema") assert.Equal(t, getSchemaV1Results, v1Create.String()) } - -func assertNodeCount(t *testing.T, result string, want int) { - resultMap := make(map[string]any) - err := json.Unmarshal([]byte(result), &resultMap) - require.Nil(t, err) - - nodes := reflect.ValueOf(resultMap["nodes"]) - got := nodes.Len() - assert.Equal(t, want, got) -} diff --git a/go/test/endtoend/topoconncache/topo_conn_cache_test.go b/go/test/endtoend/topoconncache/topo_conn_cache_test.go index 4ffcc309e29..082ecc5717f 100644 --- a/go/test/endtoend/topoconncache/topo_conn_cache_test.go +++ b/go/test/endtoend/topoconncache/topo_conn_cache_test.go @@ -51,7 +51,7 @@ func TestVtctldListAllTablets(t *testing.T) { func testListAllTablets(t *testing.T) { // first w/o any filters, aside from cell - result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ListAllTablets") + result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("GetTablets") require.NoError(t, err) tablets := getAllTablets() @@ -84,7 +84,7 @@ func deleteCell(t *testing.T) { clusterInstance.Keyspaces[0].Shards = []cluster.Shard{shard1, shard2} // Now list all tablets - result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ListAllTablets") + result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("GetTablets") require.NoError(t, err) tablets := getAllTablets() @@ -184,7 +184,7 @@ func addCellback(t *testing.T) { shard2.Vttablets = append(shard2.Vttablets, shard2Replica) shard2.Vttablets = append(shard2.Vttablets, shard1Rdonly) - result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ListAllTablets") + result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("GetTablets") require.NoError(t, err) tablets := getAllTablets()