diff --git a/go/cmd/vtctldclient/command/topology.go b/go/cmd/vtctldclient/command/topology.go index 6aa6949341c..16a71e5555c 100644 --- a/go/cmd/vtctldclient/command/topology.go +++ b/go/cmd/vtctldclient/command/topology.go @@ -22,6 +22,7 @@ import ( "github.com/spf13/cobra" "vitess.io/vitess/go/cmd/vtctldclient/cli" + "vitess.io/vitess/go/vt/topo" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) @@ -35,12 +36,22 @@ var ( Args: cobra.ExactArgs(1), RunE: commandGetTopologyPath, } - // The version of the key/path to get. If not specified, the latest/current // version is returned. version int64 = 0 // If true, only the data is output and it is in JSON format rather than prototext. dataAsJSON bool = false + + // WriteTopologyPath makes a WriteTopologyPath gRPC call to a vtctld. + WriteTopologyPath = &cobra.Command{ + Use: "WriteTopologyPath ", + Short: "Copies a local file structure to the topology server at the given path.", + DisableFlagsInUseLine: true, + Args: cobra.ExactArgs(2), + RunE: commandWriteTopologyPath, + } + // The cell to use for the copy. Defaults to global cell. + cell string ) func commandGetTopologyPath(cmd *cobra.Command, args []string) error { @@ -75,8 +86,30 @@ func commandGetTopologyPath(cmd *cobra.Command, args []string) error { return nil } +func commandWriteTopologyPath(cmd *cobra.Command, args []string) error { + /* + path := cmd.Flags().Arg(0) + file := cmd.Flags().Arg(1) + + conn, err := ts.ConnForCell(cmd.Context(), cell) + if err != nil { + return err + } + data, err := os.ReadFile(file) + if err != nil { + return err + } + _, err = conn.Update(cmd.Context(), path, data, nil) + return err + */ + return nil +} + func init() { GetTopologyPath.Flags().Int64Var(&version, "version", version, "The version of the path's key to get. If not specified, the latest version is returned.") GetTopologyPath.Flags().BoolVar(&dataAsJSON, "data-as-json", dataAsJSON, "If true, only the data is output and it is in JSON format rather than prototext.") Root.AddCommand(GetTopologyPath) + + WriteTopologyPath.Flags().StringVar(&cell, "cell", topo.GlobalCell, "Topology server cell to copy the file to.") + Root.AddCommand(WriteTopologyPath) } diff --git a/go/test/endtoend/keyspace/keyspace_test.go b/go/test/endtoend/keyspace/keyspace_test.go index f65301b9bb4..33bf9121bc7 100644 --- a/go/test/endtoend/keyspace/keyspace_test.go +++ b/go/test/endtoend/keyspace/keyspace_test.go @@ -277,24 +277,24 @@ func TestDeleteKeyspace(t *testing.T) { // TODO: (ajm188) if this test gets fixed, the flags need to be updated to comply with VEP-4 as well. // tells that in zone2 after deleting shard, there is no shard #264 and in zone1 there is only 1 #269 /*func RemoveKeyspaceCell(t *testing.T) { - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("CreateKeyspace", "test_delete_keyspace_removekscell") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("CreateShard", "test_delete_keyspace_removekscell/0") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("CreateShard", "test_delete_keyspace_removekscell/1") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("InitTablet", "--port=1234", "--bind-address=127.0.0.1", "-keyspace=test_delete_keyspace_removekscell", "--shard=0", "zone1-0000000100", "primary") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("InitTablet", "--port=1234", "--bind-address=127.0.0.1", "-keyspace=test_delete_keyspace_removekscell", "--shard=1", "zone1-0000000101", "primary") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("InitTablet", "--port=1234", "--bind-address=127.0.0.1", "-keyspace=test_delete_keyspace_removekscell", "--shard=0", "zone2-0000000100", "replica") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("InitTablet", "--port=1234", "--bind-address=127.0.0.1", "-keyspace=test_delete_keyspace_removekscell", "--shard=1", "zone2-0000000101", "replica") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("CreateKeyspace", "test_delete_keyspace_removekscell") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("CreateShard", "test_delete_keyspace_removekscell/0") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("CreateShard", "test_delete_keyspace_removekscell/1") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("InitTablet", "--port=1234", "--bind-address=127.0.0.1", "-keyspace=test_delete_keyspace_removekscell", "--shard=0", "zone1-0000000100", "primary") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("InitTablet", "--port=1234", "--bind-address=127.0.0.1", "-keyspace=test_delete_keyspace_removekscell", "--shard=1", "zone1-0000000101", "primary") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("InitTablet", "--port=1234", "--bind-address=127.0.0.1", "-keyspace=test_delete_keyspace_removekscell", "--shard=0", "zone2-0000000100", "replica") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("InitTablet", "--port=1234", "--bind-address=127.0.0.1", "-keyspace=test_delete_keyspace_removekscell", "--shard=1", "zone2-0000000101", "replica") // Create the serving/replication entries and check that they exist, so we can later check they're deleted. - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("RebuildKeyspaceGraph", "test_delete_keyspace_removekscell") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("GetShardReplication", "zone2", "test_delete_keyspace_removekscell/0") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("GetShardReplication", "zone2", "test_delete_keyspace_removekscell/1") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("GetSrvKeyspace", "zone2", "test_delete_keyspace_removekscell") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("GetSrvKeyspace", "zone1", "test_delete_keyspace_removekscell") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("RebuildKeyspaceGraph", "test_delete_keyspace_removekscell") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetShardReplication", "zone2", "test_delete_keyspace_removekscell/0") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetShardReplication", "zone2", "test_delete_keyspace_removekscell/1") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetSrvKeyspace", "zone2", "test_delete_keyspace_removekscell") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetSrvKeyspace", "zone1", "test_delete_keyspace_removekscell") // Just remove the shard from one cell (including tablets), // but leaving the global records and other cells/shards alone. - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("RemoveShardCell", "--recursive", "test_delete_keyspace_removekscell/0", "zone2") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("RemoveShardCell", "--recursive", "test_delete_keyspace_removekscell/0", "zone2") //Check that the shard is gone from zone2. srvKeyspaceZone2 := getSrvKeyspace(t, cell2, "test_delete_keyspace_removekscell") @@ -308,42 +308,42 @@ func TestDeleteKeyspace(t *testing.T) { assert.Equal(t, len(partition.ShardReferences), 2) } - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("RebuildKeyspaceGraph", "test_delete_keyspace_removekscell") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("GetKeyspace", "test_delete_keyspace_removekscell") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("GetShard", "test_delete_keyspace_removekscell/0") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("RebuildKeyspaceGraph", "test_delete_keyspace_removekscell") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetKeyspace", "test_delete_keyspace_removekscell") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetShard", "test_delete_keyspace_removekscell/0") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("GetTablet", "zone1-0000000100") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetTablet", "zone1-0000000100") - err := clusterForKSTest.VtctlclientProcess.ExecuteCommand("GetTablet", "zone2-0000000100") + err := clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetTablet", "zone2-0000000100") require.Error(t, err) - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("GetTablet", "zone2-0000000101") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("GetShardReplication", "zone1", "test_delete_keyspace_removekscell/0") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetTablet", "zone2-0000000101") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetShardReplication", "zone1", "test_delete_keyspace_removekscell/0") - err = clusterForKSTest.VtctlclientProcess.ExecuteCommand("GetShardReplication", "zone2", "test_delete_keyspace_removekscell/0") + err = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetShardReplication", "zone2", "test_delete_keyspace_removekscell/0") require.Error(t, err) - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("GetShardReplication", "zone2", "test_delete_keyspace_removekscell/1") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("GetSrvKeyspace", "zone2", "test_delete_keyspace_removekscell") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetShardReplication", "zone2", "test_delete_keyspace_removekscell/1") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetSrvKeyspace", "zone2", "test_delete_keyspace_removekscell") // Add it back to do another test. - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("InitTablet", "--port=1234", "--keyspace=test_delete_keyspace_removekscell", "--shard=0", "zone2-0000000100", "replica") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("RebuildKeyspaceGraph", "test_delete_keyspace_removekscell") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("GetShardReplication", "zone2", "test_delete_keyspace_removekscell/0") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("InitTablet", "--port=1234", "--keyspace=test_delete_keyspace_removekscell", "--shard=0", "zone2-0000000100", "replica") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("RebuildKeyspaceGraph", "test_delete_keyspace_removekscell") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetShardReplication", "zone2", "test_delete_keyspace_removekscell/0") // Now use RemoveKeyspaceCell to remove all shards. - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("RemoveKeyspaceCell", "-recursive", "test_delete_keyspace_removekscell", "zone2") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("RebuildKeyspaceGraph", "test_delete_keyspace_removekscell") - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("GetShardReplication", "zone1", "test_delete_keyspace_removekscell/0") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("RemoveKeyspaceCell", "-recursive", "test_delete_keyspace_removekscell", "zone2") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("RebuildKeyspaceGraph", "test_delete_keyspace_removekscell") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetShardReplication", "zone1", "test_delete_keyspace_removekscell/0") - err = clusterForKSTest.VtctlclientProcess.ExecuteCommand("GetShardReplication", "zone2", "test_delete_keyspace_removekscell/0") + err = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetShardReplication", "zone2", "test_delete_keyspace_removekscell/0") require.Error(t, err) - err = clusterForKSTest.VtctlclientProcess.ExecuteCommand("GetShardReplication", "zone2", "test_delete_keyspace_removekscell/1") + err = clusterForKSTest.VtctldClientProcess.ExecuteCommand("GetShardReplication", "zone2", "test_delete_keyspace_removekscell/1") require.Error(t, err) // Clean up - _ = clusterForKSTest.VtctlclientProcess.ExecuteCommand("DeleteKeyspace", "-recursive", "test_delete_keyspace_removekscell") + _ = clusterForKSTest.VtctldClientProcess.ExecuteCommand("DeleteKeyspace", "-recursive", "test_delete_keyspace_removekscell") } */ func TestShardCountForAllKeyspaces(t *testing.T) { diff --git a/go/test/endtoend/reparent/utils/utils.go b/go/test/endtoend/reparent/utils/utils.go index 2a51262557b..8425e92f971 100644 --- a/go/test/endtoend/reparent/utils/utils.go +++ b/go/test/endtoend/reparent/utils/utils.go @@ -677,7 +677,7 @@ func CheckReparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProces assert.Len(t, result[cell1].Nodes, 2) } } else { - result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("GetShardReplication", cell1, KeyspaceShard) + result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("GetShardReplication", cell1, KeyspaceShard) require.Nil(t, err, "error should be Nil") if !downPrimary { assertNodeCount(t, result, int(3)) diff --git a/go/test/endtoend/sharded/sharded_keyspace_test.go b/go/test/endtoend/sharded/sharded_keyspace_test.go index 3e5f2b3add7..dba369a2539 100644 --- a/go/test/endtoend/sharded/sharded_keyspace_test.go +++ b/go/test/endtoend/sharded/sharded_keyspace_test.go @@ -147,13 +147,13 @@ func TestShardedKeyspace(t *testing.T) { require.Nil(t, err) assert.Equal(t, `[[INT64(1) VARCHAR("test 1")]]`, fmt.Sprintf("%v", rows.Rows)) - err = clusterInstance.VtctlclientProcess.ExecuteCommand("ValidateSchemaShard", fmt.Sprintf("%s/%s", keyspaceName, shard1.Name)) + err = clusterInstance.VtctldClientProcess.ExecuteCommand("ValidateSchemaShard", fmt.Sprintf("%s/%s", keyspaceName, shard1.Name)) require.Nil(t, err) - err = clusterInstance.VtctlclientProcess.ExecuteCommand("ValidateSchemaShard", fmt.Sprintf("%s/%s", keyspaceName, shard1.Name)) + err = clusterInstance.VtctldClientProcess.ExecuteCommand("ValidateSchemaShard", fmt.Sprintf("%s/%s", keyspaceName, shard1.Name)) require.Nil(t, err) - output, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ValidateSchemaKeyspace", keyspaceName) + output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ValidateSchemaKeyspace", keyspaceName) require.Error(t, err) // We should assert that there is a schema difference and that both the shard primaries are involved in it. // However, we cannot assert in which order the two primaries will occur since the underlying function does not guarantee that @@ -166,9 +166,9 @@ func TestShardedKeyspace(t *testing.T) { require.Nil(t, err) err = clusterInstance.VtctldClientProcess.ExecuteCommand("GetPermissions", shard1.Vttablets[1].Alias) require.Nil(t, err) - err = clusterInstance.VtctlclientProcess.ExecuteCommand("ValidatePermissionsShard", fmt.Sprintf("%s/%s", keyspaceName, shard1.Name)) + err = clusterInstance.VtctldClientProcess.ExecuteCommand("ValidatePermissionsShard", fmt.Sprintf("%s/%s", keyspaceName, shard1.Name)) require.Nil(t, err) - err = clusterInstance.VtctlclientProcess.ExecuteCommand("ValidatePermissionsKeyspace", keyspaceName) + err = clusterInstance.VtctldClientProcess.ExecuteCommand("ValidatePermissionsKeyspace", keyspaceName) require.Nil(t, err) rows, err = shard1Primary.VttabletProcess.QueryTablet("select id, msg from vt_select_test order by id", keyspaceName, true) diff --git a/go/test/endtoend/vreplication/fk_ext_test.go b/go/test/endtoend/vreplication/fk_ext_test.go index e17247ab46b..d6716bcbf2d 100644 --- a/go/test/endtoend/vreplication/fk_ext_test.go +++ b/go/test/endtoend/vreplication/fk_ext_test.go @@ -151,8 +151,7 @@ func TestFKExt(t *testing.T) { } sqls := strings.Split(FKExtSourceSchema, "\n") for _, sql := range sqls { - output, err := vc.VtctlClient.ExecuteCommandWithOutput("ApplySchema", "--", - "--ddl_strategy=direct", "--sql", sql, keyspaceName) + output, err := vc.VtctldClient.ExecuteCommandWithOutput("ApplySchema", "--ddl-strategy=direct", "--sql", sql, keyspaceName) require.NoErrorf(t, err, output) } doReshard(t, fkextConfig.target2KeyspaceName, "reshard2to3", "-80,80-", threeShards, tablets) @@ -165,8 +164,7 @@ func TestFKExt(t *testing.T) { tablets[shard] = vc.Cells[cellName].Keyspaces[keyspaceName].Shards[shard].Tablets[fmt.Sprintf("%s-%d", cellName, tabletID)].Vttablet sqls := strings.Split(FKExtSourceSchema, "\n") for _, sql := range sqls { - output, err := vc.VtctlClient.ExecuteCommandWithOutput("ApplySchema", "--", - "--ddl_strategy=direct", "--sql", sql, keyspaceName) + output, err := vc.VtctldClient.ExecuteCommandWithOutput("ApplySchema", "--ddl-strategy=direct", "--sql", sql, keyspaceName) require.NoErrorf(t, err, output) } doReshard(t, fkextConfig.target2KeyspaceName, "reshard3to1", threeShards, "0", tablets) @@ -254,7 +252,7 @@ func doReshard(t *testing.T, keyspace, workflowName, sourceShards, targetShards for _, targetTab := range targetTabs { catchup(t, targetTab, workflowName, "Reshard") } - vdiff(t, keyspace, workflowName, fkextConfig.cell, false, true, nil) + vdiff(t, keyspace, workflowName, fkextConfig.cell, nil) rs.SwitchReadsAndWrites() //if lg.WaitForAdditionalRows(100) != nil { // t.Fatal("WaitForAdditionalRows failed") @@ -263,7 +261,7 @@ func doReshard(t *testing.T, keyspace, workflowName, sourceShards, targetShards if compareRowCounts(t, keyspace, strings.Split(sourceShards, ","), strings.Split(targetShards, ",")) != nil { t.Fatal("Row counts do not match") } - vdiff(t, keyspace, workflowName+"_reverse", fkextConfig.cell, true, false, nil) + vdiff(t, keyspace, workflowName+"_reverse", fkextConfig.cell, nil) rs.ReverseReadsAndWrites() //if lg.WaitForAdditionalRows(100) != nil { @@ -273,7 +271,7 @@ func doReshard(t *testing.T, keyspace, workflowName, sourceShards, targetShards if compareRowCounts(t, keyspace, strings.Split(targetShards, ","), strings.Split(sourceShards, ",")) != nil { t.Fatal("Row counts do not match") } - vdiff(t, keyspace, workflowName, fkextConfig.cell, false, true, nil) + vdiff(t, keyspace, workflowName, fkextConfig.cell, nil) lg.Stop() rs.SwitchReadsAndWrites() @@ -313,12 +311,10 @@ const fkExtMaterializeSpec = ` func materializeTables(t *testing.T) { wfName := "mat" - err := vc.VtctlClient.ExecuteCommand("ApplySchema", "--", "--ddl_strategy=direct", - "--sql", FKExtMaterializeSchema, fkextConfig.target1KeyspaceName) + err := vc.VtctldClient.ExecuteCommand("ApplySchema", "--ddl-strategy=direct", "--sql", FKExtMaterializeSchema, fkextConfig.target1KeyspaceName) require.NoError(t, err, fmt.Sprintf("ApplySchema Error: %s", err)) materializeSpec := fmt.Sprintf(fkExtMaterializeSpec, "mat", fkextConfig.target2KeyspaceName, fkextConfig.target1KeyspaceName) - err = vc.VtctlClient.ExecuteCommand("Materialize", materializeSpec) - require.NoError(t, err, "Materialize") + materialize(t, materializeSpec) tab := vc.getPrimaryTablet(t, fkextConfig.target1KeyspaceName, "0") catchup(t, tab, wfName, "Materialize") validateMaterializeRowCounts(t) @@ -363,7 +359,7 @@ func doMoveTables(t *testing.T, sourceKeyspace, targetKeyspace, workflowName, ta for _, targetTab := range targetTabs { catchup(t, targetTab, workflowName, "MoveTables") } - vdiff(t, targetKeyspace, workflowName, fkextConfig.cell, false, true, nil) + vdiff(t, targetKeyspace, workflowName, fkextConfig.cell, nil) lg.Stop() lg.SetDBStrategy("vtgate", targetKeyspace) if lg.Start() != nil { @@ -377,7 +373,7 @@ func doMoveTables(t *testing.T, sourceKeyspace, targetKeyspace, workflowName, ta } waitForLowLag(t, sourceKeyspace, workflowName+"_reverse") - vdiff(t, sourceKeyspace, workflowName+"_reverse", fkextConfig.cell, false, true, nil) + vdiff(t, sourceKeyspace, workflowName+"_reverse", fkextConfig.cell, nil) if lg.WaitForAdditionalRows(100) != nil { t.Fatal("WaitForAdditionalRows failed") } @@ -388,7 +384,7 @@ func doMoveTables(t *testing.T, sourceKeyspace, targetKeyspace, workflowName, ta } waitForLowLag(t, targetKeyspace, workflowName) time.Sleep(5 * time.Second) - vdiff(t, targetKeyspace, workflowName, fkextConfig.cell, false, true, nil) + vdiff(t, targetKeyspace, workflowName, fkextConfig.cell, nil) lg.Stop() mt.SwitchReadsAndWrites() mt.Complete() diff --git a/go/test/endtoend/vreplication/fk_test.go b/go/test/endtoend/vreplication/fk_test.go index f977d5a74cd..15664be51d9 100644 --- a/go/test/endtoend/vreplication/fk_test.go +++ b/go/test/endtoend/vreplication/fk_test.go @@ -102,11 +102,11 @@ func TestFKWorkflow(t *testing.T) { targetTab := targetKs.Shards["0"].Tablets[fmt.Sprintf("%s-%d", cellName, targetTabletId)].Vttablet require.NotNil(t, targetTab) catchup(t, targetTab, workflowName, "MoveTables") - vdiff(t, targetKeyspace, workflowName, cellName, true, false, nil) + vdiff(t, targetKeyspace, workflowName, cellName, nil) if withLoad { ls.waitForAdditionalRows(200) } - vdiff(t, targetKeyspace, workflowName, cellName, true, false, nil) + vdiff(t, targetKeyspace, workflowName, cellName, nil) if withLoad { cancel() <-ch diff --git a/go/test/endtoend/vreplication/materialize_test.go b/go/test/endtoend/vreplication/materialize_test.go index c62099a5894..9434de9d356 100644 --- a/go/test/endtoend/vreplication/materialize_test.go +++ b/go/test/endtoend/vreplication/materialize_test.go @@ -61,7 +61,7 @@ const smMaterializeSpec = `{"workflow": "wf1", "source_keyspace": "ks1", "target const initDataQuery = `insert into ks1.tx(id, typ, val) values (1, 1, 'abc'), (2, 1, 'def'), (3, 2, 'def'), (4, 2, 'abc'), (5, 3, 'def'), (6, 3, 'abc')` // testShardedMaterialize tests a materialize workflow for a sharded cluster (single shard) using comparison filters -func testShardedMaterialize(t *testing.T, useVtctldClient bool) { +func testShardedMaterialize(t *testing.T) { var err error vc = NewVitessCluster(t, nil) ks1 := "ks1" @@ -81,7 +81,7 @@ func testShardedMaterialize(t *testing.T, useVtctldClient bool) { verifyClusterHealth(t, vc) _, err = vtgateConn.ExecuteFetch(initDataQuery, 0, false) require.NoError(t, err) - materialize(t, smMaterializeSpec, useVtctldClient) + materialize(t, smMaterializeSpec) tab := vc.getPrimaryTablet(t, ks2, "0") catchup(t, tab, "wf1", "Materialize") @@ -169,7 +169,7 @@ DETERMINISTIC RETURN id * length(val); ` -func testMaterialize(t *testing.T, useVtctldClient bool) { +func testMaterialize(t *testing.T) { var err error vc = NewVitessCluster(t, nil) sourceKs := "source" @@ -199,7 +199,7 @@ func testMaterialize(t *testing.T, useVtctldClient bool) { testMaterializeWithNonExistentTable(t) - materialize(t, smMaterializeSpec2, useVtctldClient) + materialize(t, smMaterializeSpec2) catchup(t, ks2Primary, "wf1", "Materialize") // validate data after the copy phase @@ -219,21 +219,10 @@ func testMaterialize(t *testing.T, useVtctldClient bool) { // TestMaterialize runs all the individual materialize tests defined above. func TestMaterialize(t *testing.T) { t.Run("Materialize", func(t *testing.T) { - testMaterialize(t, false) + testMaterialize(t) }) t.Run("ShardedMaterialize", func(t *testing.T) { - testShardedMaterialize(t, false) - }) -} - -// TestMaterializeVtctldClient runs all the individual materialize tests -// defined above using vtctldclient instead of vtctlclient. -func TestMaterializeVtctldClient(t *testing.T) { - t.Run("Materialize", func(t *testing.T) { - testMaterialize(t, true) - }) - t.Run("ShardedMaterialize", func(t *testing.T) { - testShardedMaterialize(t, true) + testShardedMaterialize(t) }) } @@ -315,7 +304,7 @@ func TestReferenceTableMaterialize(t *testing.T) { waitForQueryResult(t, vtgateConn, "ks2:"+shard, "select id, id2 from ref2", `[[INT64(1) INT64(1)] [INT64(2) INT64(2)] [INT64(3) INT64(3)]]`) } - vdiff(t, "ks2", "wf1", defaultCellName, false, true, nil) + vdiff(t, "ks2", "wf1", defaultCellName, nil) queries := []string{ "update ks1.ref1 set val='xyz'", @@ -332,5 +321,5 @@ func TestReferenceTableMaterialize(t *testing.T) { waitForRowCount(t, vtgateConn, "ks2:"+shard, "ref1", 4) waitForRowCount(t, vtgateConn, "ks2:"+shard, "ref2", 4) } - vdiff(t, "ks2", "wf1", defaultCellName, false, true, nil) + vdiff(t, "ks2", "wf1", defaultCellName, nil) } diff --git a/go/test/endtoend/vreplication/migrate_test.go b/go/test/endtoend/vreplication/migrate_test.go index 2ccb3158fd9..9d40ea24d51 100644 --- a/go/test/endtoend/vreplication/migrate_test.go +++ b/go/test/endtoend/vreplication/migrate_test.go @@ -122,7 +122,7 @@ func TestVtctlMigrate(t *testing.T) { execVtgateQuery(t, extVtgateConn, "rating", "insert into rating(gid, pid, rating) values(3, 1, 3);") waitForRowCount(t, vtgateConn, "product:0", "rating", 3) waitForRowCount(t, vtgateConn, "product:0", "review", 4) - vdiffSideBySide(t, ksWorkflow, "extcell1") + doVDiff(t, ksWorkflow, "extcell1") if output, err = vc.VtctlClient.ExecuteCommandWithOutput("Migrate", "complete", ksWorkflow); err != nil { t.Fatalf("Migrate command failed with %+v : %s\n", err, output) @@ -256,7 +256,7 @@ func TestVtctldMigrateUnsharded(t *testing.T) { execVtgateQuery(t, extVtgateConn, "rating", "insert into rating(gid, pid, rating) values(3, 1, 3);") waitForRowCountInTablet(t, targetPrimary, "product", "rating", 3) waitForRowCountInTablet(t, targetPrimary, "product", "review", 4) - vdiffSideBySide(t, ksWorkflow, "extcell1") + doVDiff(t, ksWorkflow, "extcell1") output, err = vc.VtctldClient.ExecuteCommandWithOutput("Migrate", "--target-keyspace", "product", "--workflow", "e1", "show") diff --git a/go/test/endtoend/vreplication/movetables_buffering_test.go b/go/test/endtoend/vreplication/movetables_buffering_test.go index 7ef75390fbc..da8b9d1f96b 100644 --- a/go/test/endtoend/vreplication/movetables_buffering_test.go +++ b/go/test/endtoend/vreplication/movetables_buffering_test.go @@ -39,7 +39,7 @@ func TestMoveTablesBuffering(t *testing.T) { catchup(t, targetTab1, workflowName, "MoveTables") catchup(t, targetTab2, workflowName, "MoveTables") - vdiff(t, targetKs, workflowName, "", false, true, nil) + vdiff(t, targetKs, workflowName, "", nil) waitForLowLag(t, "customer", workflowName) for i := 0; i < 10; i++ { tstWorkflowSwitchReadsAndWrites(t) diff --git a/go/test/endtoend/vreplication/multi_tenant_test.go b/go/test/endtoend/vreplication/multi_tenant_test.go index 6bceaeefc6e..6e73303be8a 100644 --- a/go/test/endtoend/vreplication/multi_tenant_test.go +++ b/go/test/endtoend/vreplication/multi_tenant_test.go @@ -229,7 +229,7 @@ func TestMultiTenantSimple(t *testing.T) { // Create again and run it to completion. createFunc() - vdiff(t, targetKeyspace, workflowName, defaultCellName, false, true, nil) + vdiff(t, targetKeyspace, workflowName, defaultCellName, nil) mt.SwitchReads() confirmOnlyReadsSwitched(t) @@ -389,7 +389,7 @@ func TestMultiTenantSharded(t *testing.T) { // Note: we cannot insert into the target keyspace since that is never routed to the source keyspace. lastIndex = insertRows(lastIndex, sourceKeyspace) waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", targetKeyspace, mt.workflowName), binlogdatapb.VReplicationWorkflowState_Running.String()) - vdiff(t, targetKeyspace, workflowName, defaultCellName, false, true, nil) + vdiff(t, targetKeyspace, workflowName, defaultCellName, nil) mt.SwitchReadsAndWrites() // Note: here we have already switched, and we can insert into the target keyspace, and it should get reverse // replicated to the source keyspace. The source keyspace is routed to the target keyspace at this point. @@ -586,7 +586,7 @@ func (mtm *multiTenantMigration) switchTraffic(tenantId int64) { mt := mtm.getActiveMoveTables(tenantId) ksWorkflow := fmt.Sprintf("%s.%s", mtm.targetKeyspace, mt.workflowName) waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) - vdiff(t, mt.targetKeyspace, mt.workflowName, defaultCellName, false, true, nil) + vdiff(t, mt.targetKeyspace, mt.workflowName, defaultCellName, nil) mtm.insertSomeData(t, tenantId, sourceKeyspaceName, numAdditionalRowsPerTenant) mt.SwitchReadsAndWrites() mtm.insertSomeData(t, tenantId, sourceKeyspaceName, numAdditionalRowsPerTenant) diff --git a/go/test/endtoend/vreplication/partial_movetables_seq_test.go b/go/test/endtoend/vreplication/partial_movetables_seq_test.go index eec304e0a4d..596ce09cb90 100644 --- a/go/test/endtoend/vreplication/partial_movetables_seq_test.go +++ b/go/test/endtoend/vreplication/partial_movetables_seq_test.go @@ -38,6 +38,11 @@ import ( As part of a separate cleanup we will build on this framework to replace the existing one. */ +const emptyWorkflowShowResponse = `{ + "workflows": [] +} +` + type keyspace struct { name string vschema string @@ -271,9 +276,7 @@ func (wf *workflow) complete() { // TestPartialMoveTablesWithSequences enhances TestPartialMoveTables by adding an unsharded keyspace which has a // sequence. This tests that the sequence is migrated correctly and that we can reverse traffic back to the source func TestPartialMoveTablesWithSequences(t *testing.T) { - origExtraVTGateArgs := extraVTGateArgs - extraVTGateArgs = append(extraVTGateArgs, []string{ "--enable-partial-keyspace-migration", "--schema_change_signal=false", @@ -356,8 +359,8 @@ func TestPartialMoveTablesWithSequences(t *testing.T) { vtgateConn, closeConn = getVTGateConn() defer closeConn() - t.Run("Confirm routing rules", func(t *testing.T) { + t.Run("Confirm routing rules", func(t *testing.T) { // Global routing rules should be in place with everything going to the source keyspace (customer). confirmGlobalRoutingToSource(t) @@ -410,7 +413,6 @@ func TestPartialMoveTablesWithSequences(t *testing.T) { defer vtgateConn.Close() t.Run("Validate shard and tablet type routing", func(t *testing.T) { - // No shard targeting _, err = vtgateConn.ExecuteFetch(shard80DashRoutedQuery, 0, false) require.Error(t, err) @@ -482,10 +484,10 @@ func TestPartialMoveTablesWithSequences(t *testing.T) { insertCustomers(t) - output, err = tc.vc.VtctlClient.ExecuteCommandWithOutput("Workflow", "seqTgt.seq", "show") + output, err = tc.vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", wfSeq.toKeyspace, "show", "--workflow", wfSeq.name) require.NoError(t, err) - output, err = tc.vc.VtctlClient.ExecuteCommandWithOutput("Workflow", "seqSrc.seq_reverse", "show") + output, err = tc.vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", wfSeq.fromKeyspace, "show", "--workflow", fmt.Sprintf("%s_reverse", wfSeq.name)) require.NoError(t, err) wfSeq.complete() @@ -501,21 +503,19 @@ func TestPartialMoveTablesWithSequences(t *testing.T) { err = tstWorkflowExec(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", defaultWorkflowExecOptions) require.NoError(t, err) - output, err := tc.vc.VtctlClient.ExecuteCommandWithOutput("Workflow", fmt.Sprintf("%s.%s", reverseKs, reverseWf), "show") - require.Error(t, err) - require.Contains(t, output, "no streams found") + output, err := tc.vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", reverseKs, "show", "--workflow", reverseWf) + require.NoError(t, err) + require.Equal(t, emptyWorkflowShowResponse, output) - // Delete the original workflow - originalKsWf := fmt.Sprintf("%s.%s", targetKs, wf) - _, err = tc.vc.VtctlClient.ExecuteCommandWithOutput("Workflow", originalKsWf, "delete") + // Be sure that we've deleted the original workflow. + _, _ = tc.vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKs, "delete", "--workflow", wf) + output, err = tc.vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKs, "show", "--workflow", wf) require.NoError(t, err) - output, err = tc.vc.VtctlClient.ExecuteCommandWithOutput("Workflow", originalKsWf, "show") - require.Error(t, err) - require.Contains(t, output, "no streams found") + require.Equal(t, emptyWorkflowShowResponse, output) } // Confirm that the global routing rules are now gone. - output, err = tc.vc.VtctlClient.ExecuteCommandWithOutput("GetRoutingRules") + output, err = tc.vc.VtctldClient.ExecuteCommandWithOutput("GetRoutingRules") require.NoError(t, err) require.Equal(t, emptyGlobalRoutingRules, output) @@ -564,7 +564,7 @@ func insertCustomers(t *testing.T) { } func confirmGlobalRoutingToSource(t *testing.T) { - output, err := vc.VtctlClient.ExecuteCommandWithOutput("GetRoutingRules") + output, err := vc.VtctldClient.ExecuteCommandWithOutput("GetRoutingRules") require.NoError(t, err) result := gjson.Get(output, "rules") result.ForEach(func(attributeKey, attributeValue gjson.Result) bool { diff --git a/go/test/endtoend/vreplication/partial_movetables_test.go b/go/test/endtoend/vreplication/partial_movetables_test.go index 7ae8f83416d..c5b4a264e2c 100644 --- a/go/test/endtoend/vreplication/partial_movetables_test.go +++ b/go/test/endtoend/vreplication/partial_movetables_test.go @@ -127,7 +127,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) { waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", targetKeyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Running.String()) catchup(t, targetTab80Dash, workflowName, "MoveTables") - vdiff(t, targetKeyspace, workflowName, defaultCellName, false, true, nil) + vdiff(t, targetKeyspace, workflowName, defaultCellName, nil) mt.SwitchReadsAndWrites() time.Sleep(loadTestBufferingWindowDuration + 1*time.Second) mt.Complete() @@ -190,7 +190,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) { } waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", targetKeyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Running.String()) catchup(t, targetTab80Dash, workflowName, "MoveTables") - vdiff(t, targetKeyspace, workflowName, defaultCellName, false, true, nil) + vdiff(t, targetKeyspace, workflowName, defaultCellName, nil) vtgateConn, closeConn := getVTGateConn() defer closeConn() @@ -199,7 +199,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) { waitForRowCount(t, vtgateConn, "customer2:80-", "customer", 2) // customer2: 80- confirmGlobalRoutingToSource := func() { - output, err := vc.VtctlClient.ExecuteCommandWithOutput("GetRoutingRules") + output, err := vc.VtctldClient.ExecuteCommandWithOutput("GetRoutingRules") require.NoError(t, err) result := gjson.Get(output, "rules") result.ForEach(func(attributeKey, attributeValue gjson.Result) bool { @@ -307,9 +307,6 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) { require.Contains(t, err.Error(), "target: customer.-80.replica", "Query was routed to the target before partial SwitchTraffic") workflowExec := tstWorkflowExec - if flavor == workflowFlavorVtctl { - workflowExec = tstWorkflowExecVtctl - } // We cannot Complete a partial move tables at the moment because // it will find that all traffic has (obviously) not been switched. @@ -337,7 +334,7 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) { waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", targetKeyspace, workflowName), binlogdatapb.VReplicationWorkflowState_Running.String()) catchup(t, targetTabDash80, workflowName, "MoveTables") - vdiff(t, targetKeyspace, workflowName, defaultCellName, false, true, nil) + vdiff(t, targetKeyspace, workflowName, defaultCellName, nil) mtDash80.SwitchReadsAndWrites() time.Sleep(loadTestBufferingWindowDuration + 1*time.Second) @@ -366,21 +363,20 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) { err = workflowExec(t, "", reverseWf, "", reverseKs, "", workflowActionCancel, "", "", "", opts) require.NoError(t, err) - output, err := vc.VtctlClient.ExecuteCommandWithOutput("Workflow", "--", "--shards", opts.shardSubset, fmt.Sprintf("%s.%s", reverseKs, reverseWf), "show") + output, err := vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", reverseKs, "show", "--workflow", reverseWf, "--shards", opts.shardSubset) require.Error(t, err) require.Contains(t, output, "no streams found") // Delete the original workflow - originalKsWf := fmt.Sprintf("%s.%s", targetKs, wf) - _, err = vc.VtctlClient.ExecuteCommandWithOutput("Workflow", "--", "--shards", opts.shardSubset, originalKsWf, "delete") + _, err = vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKs, "delete", "--shards", opts.shardSubset) require.NoError(t, err) - output, err = vc.VtctlClient.ExecuteCommandWithOutput("Workflow", "--", "--shards", opts.shardSubset, originalKsWf, "show") + output, err = vc.VtctldClient.ExecuteCommandWithOutput("Workflow", "--keyspace", targetKs, "show", "--workflow", wf, "--shards", opts.shardSubset) require.Error(t, err) require.Contains(t, output, "no streams found") } // Confirm that the global routing rules are now gone. - output, err := vc.VtctlClient.ExecuteCommandWithOutput("GetRoutingRules") + output, err := vc.VtctldClient.ExecuteCommandWithOutput("GetRoutingRules") require.NoError(t, err) require.Equal(t, emptyGlobalRoutingRules, output) @@ -390,7 +386,6 @@ func testPartialMoveTablesBasic(t *testing.T, flavor workflowFlavor) { // TestPartialMoveTablesBasic tests partial move tables by moving each // customer shard -- -80,80- -- once a a time to customer2. -// We test with both the vtctlclient and vtctldclient flavors. func TestPartialMoveTablesBasic(t *testing.T) { currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables testPartialMoveTablesBasic(t, workflowFlavorVtctld) diff --git a/go/test/endtoend/vreplication/reference_test.go b/go/test/endtoend/vreplication/reference_test.go index 8ff77de8708..efef799878b 100644 --- a/go/test/endtoend/vreplication/reference_test.go +++ b/go/test/endtoend/vreplication/reference_test.go @@ -119,8 +119,8 @@ func TestReferenceTableMaterializationAndRouting(t *testing.T) { require.NoError(t, err) vtgateConn.Close() - materialize(t, materializeCatSpec, false) - materialize(t, materializeMfgSpec, false) + materialize(t, materializeCatSpec) + materialize(t, materializeMfgSpec) tabDash80 := vc.getPrimaryTablet(t, sks, "-80") tab80Dash := vc.getPrimaryTablet(t, sks, "80-") diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index 28ffc762ecd..a8f1996d0d9 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -89,7 +89,7 @@ func createReshardWorkflow(t *testing.T, sourceShards, targetShards string) erro confirmTablesHaveSecondaryKeys(t, []*cluster.VttabletProcess{targetTab1}, targetKs, "") catchup(t, targetTab1, workflowName, "Reshard") catchup(t, targetTab2, workflowName, "Reshard") - vdiffSideBySide(t, ksWorkflow, "") + doVDiff(t, ksWorkflow, "") return nil } @@ -104,7 +104,7 @@ func createMoveTablesWorkflow(t *testing.T, tables string) { confirmTablesHaveSecondaryKeys(t, []*cluster.VttabletProcess{targetTab1}, targetKs, tables) catchup(t, targetTab1, workflowName, "MoveTables") catchup(t, targetTab2, workflowName, "MoveTables") - vdiffSideBySide(t, ksWorkflow, "") + doVDiff(t, ksWorkflow, "") } func tstWorkflowAction(t *testing.T, action, tabletTypes, cells string) error { @@ -112,7 +112,7 @@ func tstWorkflowAction(t *testing.T, action, tabletTypes, cells string) error { } // tstWorkflowExec executes a MoveTables or Reshard workflow command using -// vtctldclient. If you need to use the legacy vtctlclient, use +// vtctldclient. // tstWorkflowExecVtctl instead. func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, action, tabletTypes, sourceShards, targetShards string, options *workflowExecOptions) error { @@ -181,74 +181,6 @@ func tstWorkflowExec(t *testing.T, cells, workflow, sourceKs, targetKs, tables, return nil } -// tstWorkflowExecVtctl executes a MoveTables or Reshard workflow command using -// vtctlclient. It should operate exactly the same way as tstWorkflowExec, but -// using the legacy client. -func tstWorkflowExecVtctl(t *testing.T, cells, workflow, sourceKs, targetKs, tables, action, tabletTypes, - sourceShards, targetShards string, options *workflowExecOptions) error { - - var args []string - if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables { - args = append(args, "MoveTables") - } else { - args = append(args, "Reshard") - } - - args = append(args, "--") - - if BypassLagCheck { - args = append(args, "--max_replication_lag_allowed=2542087h") - } - if options.atomicCopy { - args = append(args, "--atomic-copy") - } - switch action { - case workflowActionCreate: - if currentWorkflowType == binlogdatapb.VReplicationWorkflowType_MoveTables { - args = append(args, "--source", sourceKs) - if tables != "" { - args = append(args, "--tables", tables) - } else { - args = append(args, "--all") - } - if sourceShards != "" { - args = append(args, "--source_shards", sourceShards) - } - } else { - args = append(args, "--source_shards", sourceShards, "--target_shards", targetShards) - } - // Test new experimental --defer-secondary-keys flag - switch currentWorkflowType { - case binlogdatapb.VReplicationWorkflowType_MoveTables, binlogdatapb.VReplicationWorkflowType_Migrate, binlogdatapb.VReplicationWorkflowType_Reshard: - if !options.atomicCopy && options.deferSecondaryKeys { - args = append(args, "--defer-secondary-keys") - } - args = append(args, "--initialize-target-sequences") // Only used for MoveTables - } - case workflowActionMirrorTraffic: - args = append(args, "--percent", strconv.FormatFloat(float64(options.percent), byte('f'), -1, 32)) - default: - if options.shardSubset != "" { - args = append(args, "--shards", options.shardSubset) - } - } - if cells != "" { - args = append(args, "--cells", cells) - } - if tabletTypes != "" { - args = append(args, "--tablet_types", tabletTypes) - } - args = append(args, "--timeout", time.Minute.String()) - ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow) - args = append(args, action, ksWorkflow) - output, err := vc.VtctlClient.ExecuteCommandWithOutput(args...) - lastOutput = output - if err != nil { - return fmt.Errorf("%s: %s", err, output) - } - return nil -} - func tstWorkflowSwitchReads(t *testing.T, tabletTypes, cells string) { if tabletTypes == "" { tabletTypes = "replica,rdonly" @@ -288,23 +220,15 @@ func tstWorkflowComplete(t *testing.T) error { } // testWorkflowUpdate is a very simple test of the workflow update -// vtctlclient/vtctldclient command. +// vtctldclient command. // It performs a non-behavior impacting update, setting tablet-types // to primary,replica,rdonly (the only applicable types in these tests). func testWorkflowUpdate(t *testing.T) { tabletTypes := "primary,replica,rdonly" - // Test vtctlclient first. - _, err := vc.VtctlClient.ExecuteCommandWithOutput("workflow", "--", "--tablet-types", tabletTypes, "noexist.noexist", "update") - require.Error(t, err, err) - resp, err := vc.VtctlClient.ExecuteCommandWithOutput("workflow", "--", "--tablet-types", tabletTypes, ksWorkflow, "update") - require.NoError(t, err) - require.NotEmpty(t, resp) - - // Test vtctldclient last. - _, err = vc.VtctldClient.ExecuteCommandWithOutput("workflow", "--keyspace", "noexist", "update", "--workflow", "noexist", "--tablet-types", tabletTypes) + _, err := vc.VtctldClient.ExecuteCommandWithOutput("workflow", "--keyspace", "noexist", "update", "--workflow", "noexist", "--tablet-types", tabletTypes) require.Error(t, err) // Change the tablet-types to rdonly. - resp, err = vc.VtctldClient.ExecuteCommandWithOutput("workflow", "--keyspace", targetKs, "update", "--workflow", workflowName, "--tablet-types", "rdonly") + resp, err := vc.VtctldClient.ExecuteCommandWithOutput("workflow", "--keyspace", targetKs, "update", "--workflow", workflowName, "--tablet-types", "rdonly") require.NoError(t, err, err) // Confirm that we changed the workflow. var ures vtctldatapb.WorkflowUpdateResponse @@ -559,10 +483,10 @@ func testReplicatingWithPKEnumCols(t *testing.T) { insertQuery := "insert into customer(cid, name, typ, sport, meta) values(2, 'PaĆ¼l','soho','cricket',convert(x'7b7d' using utf8mb4))" execVtgateQuery(t, vtgateConn, sourceKs, deleteQuery) waitForNoWorkflowLag(t, vc, targetKs, workflowName) - vdiffSideBySide(t, ksWorkflow, "") + doVDiff(t, ksWorkflow, "") execVtgateQuery(t, vtgateConn, sourceKs, insertQuery) waitForNoWorkflowLag(t, vc, targetKs, workflowName) - vdiffSideBySide(t, ksWorkflow, "") + doVDiff(t, ksWorkflow, "") } func testReshardV2Workflow(t *testing.T) { @@ -697,9 +621,9 @@ func testMoveTablesV2Workflow(t *testing.T) { testRestOfWorkflow(t) // Create our primary intra-keyspace materialization. - materialize(t, materializeCustomerNameSpec, false) + materialize(t, materializeCustomerNameSpec) // Create a second one to confirm that multiple ones get migrated correctly. - materialize(t, materializeCustomerTypeSpec, false) + materialize(t, materializeCustomerTypeSpec) materializeShow() output, err = vc.VtctldClient.ExecuteCommandWithOutput(listAllArgs...) @@ -986,7 +910,7 @@ func moveCustomerTableSwitchFlows(t *testing.T, cells []*Cell, sourceCellOrAlias moveTablesAction(t, "Create", sourceCellOrAlias, workflow, sourceKs, targetKs, tables) catchup(t, targetTab1, workflow, workflowType) catchup(t, targetTab2, workflow, workflowType) - vdiffSideBySide(t, ksWorkflow, "") + doVDiff(t, ksWorkflow, "") } allCellNames := getCellNames(cells) var switchReadsFollowedBySwitchWrites = func() { diff --git a/go/test/endtoend/vreplication/sidecardb_test.go b/go/test/endtoend/vreplication/sidecardb_test.go index be9ce67a626..704742d305b 100644 --- a/go/test/endtoend/vreplication/sidecardb_test.go +++ b/go/test/endtoend/vreplication/sidecardb_test.go @@ -14,7 +14,7 @@ import ( const GetCurrentTablesQuery = "show tables from _vt" func getSidecarDBTables(t *testing.T, tabletID string) (numTablets int, tables []string) { - output, err := vc.VtctlClient.ExecuteCommandWithOutput("ExecuteFetchAsDba", "--", "--json", tabletID, GetCurrentTablesQuery) + output, err := vc.VtctldClient.ExecuteCommandWithOutput("ExecuteFetchAsDBA", "--json", tabletID, GetCurrentTablesQuery) require.NoError(t, err) result := gjson.Get(output, "rows") require.NotNil(t, result) @@ -118,7 +118,7 @@ func validateSidecarDBTables(t *testing.T, tabletID string, tables []string) { func modifySidecarDBSchema(t *testing.T, vc *VitessCluster, tabletID string, ddls []string) (numChanges int) { for _, ddl := range ddls { - output, err := vc.VtctlClient.ExecuteCommandWithOutput("ExecuteFetchAsDba", "--", tabletID, ddl) + output, err := vc.VtctldClient.ExecuteCommandWithOutput("ExecuteFetchAsDBA", tabletID, ddl) require.NoErrorf(t, err, output) } return len(ddls) diff --git a/go/test/endtoend/vreplication/time_zone_test.go b/go/test/endtoend/vreplication/time_zone_test.go index 2c0a9a4f5a5..eaae73a3081 100644 --- a/go/test/endtoend/vreplication/time_zone_test.go +++ b/go/test/endtoend/vreplication/time_zone_test.go @@ -87,14 +87,13 @@ func TestMoveTablesTZ(t *testing.T) { tables := "datze" - ksErrorWorkflow := fmt.Sprintf("%s.%s", targetKs, "tzerr") - output, err := vc.VtctlClient.ExecuteCommandWithOutput("MoveTables", "--", "--source", sourceKs, "--tables", - tables, "--source_time_zone", "US/Pacifik", "Create", ksErrorWorkflow) + output, err := vc.VtctldClient.ExecuteCommandWithOutput("MoveTables", "--workflow", workflow, "--target-keyspace", targetKs, "Create", + "--source-keyspace", sourceKs, "--tables", tables, "--source-time-zone", "US/Pacifik") require.Error(t, err, output) require.Contains(t, output, "time zone is invalid") - output, err = vc.VtctlClient.ExecuteCommandWithOutput("MoveTables", "--", "--source", sourceKs, "--tables", - tables, "--source_time_zone", "US/Pacific", "Create", ksWorkflow) + output, err = vc.VtctldClient.ExecuteCommandWithOutput("MoveTables", "--workflow", workflow, "--target-keyspace", targetKs, "Create", + "--source-keyspace", sourceKs, "--tables", tables, "--source-time-zone", "US/Pacific") require.NoError(t, err, output) catchup(t, customerTab, workflow, "MoveTables") @@ -105,7 +104,7 @@ func TestMoveTablesTZ(t *testing.T) { _, err = vtgateConn.ExecuteFetch("insert into datze(id, dt2) values (12, '2022-04-01 5:06:07')", 1, false) // dst require.NoError(t, err) - vdiffSideBySide(t, ksWorkflow, "") + doVDiff(t, ksWorkflow, "") // update to test date conversions in replication (vplayer) mode (update statements) _, err = vtgateConn.ExecuteFetch("update datze set dt2 = '2022-04-01 5:06:07' where id = 11", 1, false) // dst @@ -113,7 +112,7 @@ func TestMoveTablesTZ(t *testing.T) { _, err = vtgateConn.ExecuteFetch("update datze set dt2 = '2022-01-01 10:20:30' where id = 12", 1, false) // standard time require.NoError(t, err) - vdiffSideBySide(t, ksWorkflow, "") + doVDiff(t, ksWorkflow, "") query := "select * from datze" qrSourceUSPacific, err := productTab.QueryTablet(query, sourceKs, true) @@ -192,5 +191,5 @@ func TestMoveTablesTZ(t *testing.T) { // inserts to test date conversions in reverse replication execVtgateQuery(t, vtgateConn, "customer", "insert into datze(id, dt2) values (13, '2022-01-01 18:20:30')") execVtgateQuery(t, vtgateConn, "customer", "insert into datze(id, dt2) values (14, '2022-04-01 12:06:07')") - vdiffSideBySide(t, ksReverseWorkflow, "") + doVDiff(t, ksReverseWorkflow, "") } diff --git a/go/test/endtoend/vreplication/vdiff2_test.go b/go/test/endtoend/vreplication/vdiff2_test.go index 612ba00236b..6116e26eef5 100644 --- a/go/test/endtoend/vreplication/vdiff2_test.go +++ b/go/test/endtoend/vreplication/vdiff2_test.go @@ -210,17 +210,19 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, statsShard := arrTargetShards[0] statsTablet := vc.getPrimaryTablet(t, tc.targetKs, statsShard) var args []string - args = append(args, tc.typ, "--") - args = append(args, "--source", tc.sourceKs) - if tc.typ == "Reshard" { - args = append(args, "--source_shards", tc.sourceShards, "--target_shards", tc.targetShards) - } + args = append(args, tc.typ) + args = append(args, "--workflow", tc.workflow) + args = append(args, "--target-keyspace", tc.targetKs) allCellNames := getCellNames(nil) + args = append(args, "create") args = append(args, "--cells", allCellNames) - args = append(args, "--tables", tc.tables) - args = append(args, "Create") - args = append(args, ksWorkflow) - err := vc.VtctlClient.ExecuteCommand(args...) + if tc.typ == "Reshard" { + args = append(args, "--source-shards", tc.sourceShards, "--target-shards", tc.targetShards) + } else { + args = append(args, "--source-keyspace", tc.sourceKs) + args = append(args, "--tables", tc.tables) + } + err := vc.VtctldClient.ExecuteCommand(args...) require.NoError(t, err) waitForShardsToCatchup := func() { @@ -279,8 +281,8 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, waitForShardsToCatchup() tc.vdiffCount++ // We only did vtctldclient vdiff create } else { - vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil) - tc.vdiffCount += 2 // We did vtctlclient AND vtctldclient vdiff create + vdiff(t, tc.targetKs, tc.workflow, allCellNames, nil) + tc.vdiffCount++ } checkVDiffCountStat(t, statsTablet, tc.vdiffCount) @@ -288,7 +290,7 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, // compared by vdiff per table at the controller level -- works as expected. vdrc, err := getDebugVar(t, statsTablet.Port, []string{"VDiffRowsCompared"}) require.NoError(t, err, "failed to get VDiffRowsCompared stat from %s-%d tablet: %v", statsTablet.Cell, statsTablet.TabletUID, err) - uuid, jsout := performVDiff2Action(t, false, ksWorkflow, allCellNames, "show", "last", false, "--verbose") + uuid, jsout := performVDiff2Action(t, ksWorkflow, allCellNames, "show", "last", false, "--verbose") expect := gjson.Get(jsout, fmt.Sprintf("Reports.customer.%s", statsShard)).Int() got := gjson.Get(vdrc, fmt.Sprintf("%s.%s.%s", tc.workflow, uuid, "customer")).Int() require.Equal(t, expect, got, "expected VDiffRowsCompared stat to be %d, but got %d", expect, got) @@ -307,12 +309,12 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, if tc.stop { testStop(t, ksWorkflow, allCellNames) - tc.vdiffCount++ // We did either vtctlclient OR vtctldclient vdiff create + tc.vdiffCount++ } if tc.testCLICreateWait { testCLICreateWait(t, ksWorkflow, allCellNames) - tc.vdiffCount++ // We did either vtctlclient OR vtctldclient vdiff create + tc.vdiffCount++ } if tc.testCLIErrors { testCLIErrors(t, ksWorkflow, allCellNames) @@ -330,14 +332,14 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, // Create another VDiff record to confirm it gets deleted when the workflow is completed. ts := time.Now() - uuid, _ = performVDiff2Action(t, false, ksWorkflow, allCellNames, "create", "", false) - waitForVDiff2ToComplete(t, false, ksWorkflow, allCellNames, uuid, ts) + uuid, _ = performVDiff2Action(t, ksWorkflow, allCellNames, "create", "", false) + waitForVDiff2ToComplete(t, ksWorkflow, allCellNames, uuid, ts) tc.vdiffCount++ checkVDiffCountStat(t, statsTablet, tc.vdiffCount) - err = vc.VtctlClient.ExecuteCommand(tc.typ, "--", "SwitchTraffic", ksWorkflow) + err = vc.VtctldClient.ExecuteCommand(tc.typ, "--workflow", tc.workflow, "--target-keyspace", tc.targetKs, "SwitchTraffic") require.NoError(t, err) - err = vc.VtctlClient.ExecuteCommand(tc.typ, "--", "Complete", ksWorkflow) + err = vc.VtctldClient.ExecuteCommand(tc.typ, "--workflow", tc.workflow, "--target-keyspace", tc.targetKs, "Complete") require.NoError(t, err) // Confirm the VDiff data is deleted for the workflow. @@ -348,18 +350,18 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, func testCLIErrors(t *testing.T, ksWorkflow, cells string) { t.Run("Client error handling", func(t *testing.T) { - _, output := performVDiff2Action(t, false, ksWorkflow, cells, "badcmd", "", true) + _, output := performVDiff2Action(t, ksWorkflow, cells, "badcmd", "", true) require.Contains(t, output, "Usage:") - _, output = performVDiff2Action(t, false, ksWorkflow, cells, "create", "invalid_uuid", true) + _, output = performVDiff2Action(t, ksWorkflow, cells, "create", "invalid_uuid", true) require.Contains(t, output, "invalid UUID provided") - _, output = performVDiff2Action(t, false, ksWorkflow, cells, "resume", "invalid_uuid", true) + _, output = performVDiff2Action(t, ksWorkflow, cells, "resume", "invalid_uuid", true) require.Contains(t, output, "invalid UUID provided") - _, output = performVDiff2Action(t, false, ksWorkflow, cells, "delete", "invalid_uuid", true) + _, output = performVDiff2Action(t, ksWorkflow, cells, "delete", "invalid_uuid", true) require.Contains(t, output, "invalid argument provided") - _, output = performVDiff2Action(t, false, ksWorkflow, cells, "show", "invalid_uuid", true) + _, output = performVDiff2Action(t, ksWorkflow, cells, "show", "invalid_uuid", true) require.Contains(t, output, "invalid argument provided") - uuid, _ := performVDiff2Action(t, false, ksWorkflow, cells, "show", "last", false) - _, output = performVDiff2Action(t, false, ksWorkflow, cells, "create", uuid, true) + uuid, _ := performVDiff2Action(t, ksWorkflow, cells, "show", "last", false) + _, output = performVDiff2Action(t, ksWorkflow, cells, "create", uuid, true) require.Contains(t, output, "already exists") }) } @@ -432,7 +434,7 @@ func testCLIFlagHandling(t *testing.T, targetKs, workflowName string, cell *Cell // Delete this vdiff as we used --auto-start=false and thus it never starts and // does not provide the normally expected show --verbose --format=json output. - _, output := performVDiff2Action(t, false, fmt.Sprintf("%s.%s", targetKs, workflowName), "", "delete", vduuid.String(), false) + _, output := performVDiff2Action(t, fmt.Sprintf("%s.%s", targetKs, workflowName), "", "delete", vduuid.String(), false) require.Equal(t, "completed", gjson.Get(output, "Status").String()) }) } @@ -450,35 +452,35 @@ func testDelete(t *testing.T, ksWorkflow, cells string) { } return int64(len(seen)) } - _, output := performVDiff2Action(t, false, ksWorkflow, cells, "show", "all", false) + _, output := performVDiff2Action(t, ksWorkflow, cells, "show", "all", false) initialVDiffCount := uuidCount(gjson.Get(output, "#.UUID").Array()) for ; initialVDiffCount < 3; initialVDiffCount++ { - _, _ = performVDiff2Action(t, false, ksWorkflow, cells, "create", "", false) + _, _ = performVDiff2Action(t, ksWorkflow, cells, "create", "", false) } // Now let's confirm that we have at least 3 unique VDiffs. - _, output = performVDiff2Action(t, false, ksWorkflow, cells, "show", "all", false) + _, output = performVDiff2Action(t, ksWorkflow, cells, "show", "all", false) require.GreaterOrEqual(t, uuidCount(gjson.Get(output, "#.UUID").Array()), int64(3)) // And that our initial count is what we expect. require.Equal(t, initialVDiffCount, uuidCount(gjson.Get(output, "#.UUID").Array())) // Test show last with verbose too as a side effect. - uuid, output := performVDiff2Action(t, false, ksWorkflow, cells, "show", "last", false, "--verbose") + uuid, output := performVDiff2Action(t, ksWorkflow, cells, "show", "last", false, "--verbose") // The TableSummary is only present with --verbose. require.Contains(t, output, `"TableSummary":`) // Now let's delete one of the VDiffs. - _, output = performVDiff2Action(t, false, ksWorkflow, cells, "delete", uuid, false) + _, output = performVDiff2Action(t, ksWorkflow, cells, "delete", uuid, false) require.Equal(t, "completed", gjson.Get(output, "Status").String()) // And confirm that our unique VDiff count has only decreased by one. - _, output = performVDiff2Action(t, false, ksWorkflow, cells, "show", "all", false) + _, output = performVDiff2Action(t, ksWorkflow, cells, "show", "all", false) require.Equal(t, initialVDiffCount-1, uuidCount(gjson.Get(output, "#.UUID").Array())) // Now let's delete all of them. - _, output = performVDiff2Action(t, false, ksWorkflow, cells, "delete", "all", false) + _, output = performVDiff2Action(t, ksWorkflow, cells, "delete", "all", false) require.Equal(t, "completed", gjson.Get(output, "Status").String()) // And finally confirm that we have no more VDiffs. - _, output = performVDiff2Action(t, false, ksWorkflow, cells, "show", "all", false) + _, output = performVDiff2Action(t, ksWorkflow, cells, "show", "all", false) require.Equal(t, int64(0), gjson.Get(output, "#").Int()) }) } @@ -502,7 +504,7 @@ func testResume(t *testing.T, tc *testCase, cells string) { ksWorkflow := fmt.Sprintf("%s.%s", tc.targetKs, tc.workflow) // Confirm the last VDiff is in the expected completed state. - uuid, output := performVDiff2Action(t, false, ksWorkflow, cells, "show", "last", false) + uuid, output := performVDiff2Action(t, ksWorkflow, cells, "show", "last", false) jsonOutput := getVDiffInfo(output) require.Equal(t, "completed", jsonOutput.State) // Save the number of rows compared in previous runs. @@ -518,8 +520,8 @@ func testResume(t *testing.T, tc *testCase, cells string) { // confirm that the VDiff was resumed, able to complete, and we compared the // expected number of rows in total (original run and resume) - _, _ = performVDiff2Action(t, false, ksWorkflow, cells, "resume", uuid, false) - info := waitForVDiff2ToComplete(t, false, ksWorkflow, cells, uuid, ogTime) + _, _ = performVDiff2Action(t, ksWorkflow, cells, "resume", uuid, false) + info := waitForVDiff2ToComplete(t, ksWorkflow, cells, uuid, ogTime) require.NotNil(t, info) require.False(t, info.HasMismatch) require.Equal(t, expectedRows, info.RowsCompared) @@ -529,10 +531,10 @@ func testResume(t *testing.T, tc *testCase, cells string) { func testStop(t *testing.T, ksWorkflow, cells string) { t.Run("Stop", func(t *testing.T) { // Create a new VDiff and immediately stop it. - uuid, _ := performVDiff2Action(t, false, ksWorkflow, cells, "create", "", false) - _, _ = performVDiff2Action(t, false, ksWorkflow, cells, "stop", uuid, false) + uuid, _ := performVDiff2Action(t, ksWorkflow, cells, "create", "", false) + _, _ = performVDiff2Action(t, ksWorkflow, cells, "stop", uuid, false) // Confirm the VDiff is in the expected state. - _, output := performVDiff2Action(t, false, ksWorkflow, cells, "show", uuid, false) + _, output := performVDiff2Action(t, ksWorkflow, cells, "show", uuid, false) jsonOutput := getVDiffInfo(output) // It may have been able to complete before we could stop it (there's virtually no data // to diff). There's no way to avoid this potential race so don't consider that a failure. @@ -549,7 +551,7 @@ func testAutoRetryError(t *testing.T, tc *testCase, cells string) { ksWorkflow := fmt.Sprintf("%s.%s", tc.targetKs, tc.workflow) // Confirm the last VDiff is in the expected completed state. - uuid, output := performVDiff2Action(t, false, ksWorkflow, cells, "show", "last", false) + uuid, output := performVDiff2Action(t, ksWorkflow, cells, "show", "last", false) jsonOutput := getVDiffInfo(output) require.Equal(t, "completed", jsonOutput.State) // Save the number of rows compared in the first run. @@ -576,7 +578,7 @@ func testAutoRetryError(t *testing.T, tc *testCase, cells string) { // Confirm that the VDiff was retried, able to complete, and we compared the expected // number of rows in total (original run and retry). - info := waitForVDiff2ToComplete(t, false, ksWorkflow, cells, uuid, ogTime) + info := waitForVDiff2ToComplete(t, ksWorkflow, cells, uuid, ogTime) require.NotNil(t, info) require.False(t, info.HasMismatch) require.Equal(t, expectedRows, info.RowsCompared) @@ -584,10 +586,10 @@ func testAutoRetryError(t *testing.T, tc *testCase, cells string) { } func testCLICreateWait(t *testing.T, ksWorkflow string, cells string) { - t.Run("vtctl create and wait", func(t *testing.T) { + t.Run("vtctldclient create and wait", func(t *testing.T) { chCompleted := make(chan bool) go func() { - _, output := performVDiff2Action(t, false, ksWorkflow, cells, "create", "", false, "--wait", "--wait-update-interval=1s") + _, output := performVDiff2Action(t, ksWorkflow, cells, "create", "", false, "--wait", "--wait-update-interval=1s") completed := false // We don't try to parse the JSON output as it may contain a series of outputs // that together do not form a valid JSON document. We can change this in the diff --git a/go/test/endtoend/vreplication/vdiff_helper_test.go b/go/test/endtoend/vreplication/vdiff_helper_test.go index fd223d78082..fcc112b670b 100644 --- a/go/test/endtoend/vreplication/vdiff_helper_test.go +++ b/go/test/endtoend/vreplication/vdiff_helper_test.go @@ -45,55 +45,18 @@ var ( runVDiffsSideBySide = true ) -func vdiff(t *testing.T, keyspace, workflow, cells string, vtctlclient, vtctldclient bool, wantV2Result *expectedVDiff2Result) { - if vtctlclient { - doVtctlclientVDiff(t, keyspace, workflow, cells, wantV2Result) - } - if vtctldclient { - doVtctldclientVDiff(t, keyspace, workflow, cells, wantV2Result) - } +func vdiff(t *testing.T, keyspace, workflow, cells string, wantV2Result *expectedVDiff2Result) { + doVtctldclientVDiff(t, keyspace, workflow, cells, wantV2Result) } -// vdiffSideBySide will run the VDiff command using both vtctlclient -// and vtctldclient. -func vdiffSideBySide(t *testing.T, ksWorkflow, cells string) { +func doVDiff(t *testing.T, ksWorkflow, cells string) { arr := strings.Split(ksWorkflow, ".") keyspace := arr[0] workflowName := arr[1] - if !runVDiffsSideBySide { - doVtctlclientVDiff(t, keyspace, workflowName, cells, nil) - return - } - vdiff(t, keyspace, workflowName, cells, true, true, nil) -} - -func doVtctlclientVDiff(t *testing.T, keyspace, workflow, cells string, want *expectedVDiff2Result) { - ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflow) - t.Run(fmt.Sprintf("vtctlclient vdiff %s", ksWorkflow), func(t *testing.T) { - // update-table-stats is needed in order to test progress reports. - uuid, _ := performVDiff2Action(t, true, ksWorkflow, cells, "create", "", false, "--auto-retry", - "--update-table-stats", fmt.Sprintf("--filtered_replication_wait_time=%v", vdiffTimeout/2)) - info := waitForVDiff2ToComplete(t, true, ksWorkflow, cells, uuid, time.Time{}) - require.NotNil(t, info) - require.Equal(t, workflow, info.Workflow) - require.Equal(t, keyspace, info.Keyspace) - if want != nil { - require.Equal(t, want.state, info.State) - require.Equal(t, strings.Join(want.shards, ","), info.Shards) - require.Equal(t, want.hasMismatch, info.HasMismatch) - } else { - require.Equal(t, "completed", info.State, "vdiff results: %+v", info) - require.False(t, info.HasMismatch, "vdiff results: %+v", info) - require.NotZero(t, info.RowsCompared) - } - if strings.Contains(t.Name(), "AcrossDBVersions") { - log.Errorf("VDiff resume cannot be guaranteed between major MySQL versions due to implied collation differences, skipping resume test...") - return - } - }) + doVtctldclientVDiff(t, keyspace, workflowName, cells, nil) } -func waitForVDiff2ToComplete(t *testing.T, useVtctlclient bool, ksWorkflow, cells, uuid string, completedAtMin time.Time) *vdiffInfo { +func waitForVDiff2ToComplete(t *testing.T, ksWorkflow, cells, uuid string, completedAtMin time.Time) *vdiffInfo { var info *vdiffInfo var jsonStr string first := true @@ -102,7 +65,7 @@ func waitForVDiff2ToComplete(t *testing.T, useVtctlclient bool, ksWorkflow, cell go func() { for { time.Sleep(vdiffStatusCheckInterval) - _, jsonStr = performVDiff2Action(t, useVtctlclient, ksWorkflow, cells, "show", uuid, false) + _, jsonStr = performVDiff2Action(t, ksWorkflow, cells, "show", uuid, false) info = getVDiffInfo(jsonStr) require.NotNil(t, info) if info.State == "completed" { @@ -169,8 +132,8 @@ func doVtctldclientVDiff(t *testing.T, keyspace, workflow, cells string, want *e if len(extraFlags) > 0 { flags = append(flags, extraFlags...) } - uuid, _ := performVDiff2Action(t, false, ksWorkflow, cells, "create", "", false, flags...) - info := waitForVDiff2ToComplete(t, false, ksWorkflow, cells, uuid, time.Time{}) + uuid, _ := performVDiff2Action(t, ksWorkflow, cells, "create", "", false, flags...) + info := waitForVDiff2ToComplete(t, ksWorkflow, cells, uuid, time.Time{}) require.NotNil(t, info) require.Equal(t, workflow, info.Workflow) require.Equal(t, keyspace, info.Keyspace) @@ -191,56 +154,34 @@ func doVtctldclientVDiff(t *testing.T, keyspace, workflow, cells string, want *e }) } -func performVDiff2Action(t *testing.T, useVtctlclient bool, ksWorkflow, cells, action, actionArg string, expectError bool, extraFlags ...string) (uuid string, output string) { +func performVDiff2Action(t *testing.T, ksWorkflow, cells, action, actionArg string, expectError bool, extraFlags ...string) (uuid string, output string) { var err error targetKeyspace, workflowName, ok := strings.Cut(ksWorkflow, ".") require.True(t, ok, "invalid keyspace.workflow value: %s", ksWorkflow) waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) - if useVtctlclient { + args := []string{"VDiff", "--target-keyspace", targetKeyspace, "--workflow", workflowName, "--format=json", action} + if strings.ToLower(action) == string(vdiff2.CreateAction) { // This will always result in us using a PRIMARY tablet, which is all // we start in many e2e tests, but it avoids the tablet picker logic // where when you ONLY specify the PRIMARY type it then picks the // shard's primary and ignores any cell settings. - args := []string{"VDiff", "--", "--tablet_types=in_order:primary,replica", "--source_cell=" + cells, "--format=json"} - if len(extraFlags) > 0 { - args = append(args, extraFlags...) - } - args = append(args, ksWorkflow, action, actionArg) - output, err = execVDiffWithRetry(t, expectError, false, args) - log.Infof("vdiff output: %+v (err: %+v)", output, err) - if !expectError { - require.Nil(t, err) - uuid = gjson.Get(output, "UUID").String() - if action != "delete" && !(action == "show" && actionArg == "all") { // A UUID is not required - require.NoError(t, err) - require.NotEmpty(t, uuid) - } - } - } else { - args := []string{"VDiff", "--target-keyspace", targetKeyspace, "--workflow", workflowName, "--format=json", action} - if strings.ToLower(action) == string(vdiff2.CreateAction) { - // This will always result in us using a PRIMARY tablet, which is all - // we start in many e2e tests, but it avoids the tablet picker logic - // where when you ONLY specify the PRIMARY type it then picks the - // shard's primary and ignores any cell settings. - args = append(args, "--tablet-types=primary,replica", "--tablet-types-in-preference-order", "--source-cells="+cells) - } - if len(extraFlags) > 0 { - args = append(args, extraFlags...) - } - if actionArg != "" { - args = append(args, actionArg) - } + args = append(args, "--tablet-types=primary,replica", "--tablet-types-in-preference-order", "--source-cells="+cells) + } + if len(extraFlags) > 0 { + args = append(args, extraFlags...) + } + if actionArg != "" { + args = append(args, actionArg) + } - output, err = execVDiffWithRetry(t, expectError, true, args) - log.Infof("vdiff output: %+v (err: %+v)", output, err) - if !expectError { - require.NoError(t, err) - ouuid := gjson.Get(output, "UUID").String() - if action == "create" || (action == "show" && actionArg != "all") { // A UUID is returned - require.NotEmpty(t, ouuid) - uuid = ouuid - } + output, err = execVDiffWithRetry(t, expectError, args) + log.Infof("vdiff output: %+v (err: %+v)", output, err) + if !expectError { + require.NoError(t, err) + ouuid := gjson.Get(output, "UUID").String() + if action == "create" || (action == "show" && actionArg != "all") { // A UUID is returned + require.NotEmpty(t, ouuid) + uuid = ouuid } } @@ -264,7 +205,7 @@ type vdiffResult struct { } // execVDiffWithRetry will ignore transient errors that can occur during workflow state changes. -func execVDiffWithRetry(t *testing.T, expectError bool, useVtctldClient bool, args []string) (string, error) { +func execVDiffWithRetry(t *testing.T, expectError bool, args []string) (string, error) { ctx, cancel := context.WithTimeout(context.Background(), vdiffRetryTimeout) defer cancel() vdiffResultCh := make(chan vdiffResult) @@ -282,11 +223,7 @@ func execVDiffWithRetry(t *testing.T, expectError bool, useVtctldClient bool, ar time.Sleep(vdiffRetryInterval) } retry = false - if useVtctldClient { - output, err = vc.VtctldClient.ExecuteCommandWithOutput(args...) - } else { - output, err = vc.VtctlClient.ExecuteCommandWithOutput(args...) - } + output, err = vc.VtctldClient.ExecuteCommandWithOutput(args...) if err != nil { if expectError { result := vdiffResult{output: output, err: err} diff --git a/go/test/endtoend/vreplication/vdiff_multiple_movetables_test.go b/go/test/endtoend/vreplication/vdiff_multiple_movetables_test.go index a4c25941801..d668701100e 100644 --- a/go/test/endtoend/vreplication/vdiff_multiple_movetables_test.go +++ b/go/test/endtoend/vreplication/vdiff_multiple_movetables_test.go @@ -102,7 +102,7 @@ func TestMultipleConcurrentVDiffs(t *testing.T) { doVdiff := func(workflowName, table string) { defer wg.Done() - vdiff(t, targetKeyspace, workflowName, cellName, true, false, nil) + vdiff(t, targetKeyspace, workflowName, cellName, nil) } go doVdiff("wf1", "customer") go doVdiff("wf2", "customer2") diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 955afde2f18..0a5664b8486 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -336,7 +336,7 @@ func testVreplicationWorkflows(t *testing.T, limited bool, binlogRowImage string defer vtgateConn.Close() verifyClusterHealth(t, vc) insertInitialData(t) - materializeRollup(t, true) + materializeRollup(t) shardCustomer(t, true, []*Cell{defaultCell}, defaultCellName, false) @@ -351,11 +351,11 @@ func testVreplicationWorkflows(t *testing.T, limited bool, binlogRowImage string return } - materializeProduct(t, true) + materializeProduct(t) - materializeMerchantOrders(t, true) - materializeSales(t, true) - materializeMerchantSales(t, true) + materializeMerchantOrders(t) + materializeSales(t) + materializeMerchantSales(t) reshardMerchant2to3SplitMerge(t) reshardMerchant3to1Merge(t) @@ -499,13 +499,13 @@ func TestVStreamFlushBinlog(t *testing.T) { // Now we should rotate the binary logs ONE time on the source, even // though we're opening up multiple result streams (1 per table). runVDiffsSideBySide = false - vdiff(t, targetKs, workflow, defaultCellName, true, false, nil) + vdiff(t, targetKs, workflow, defaultCellName, nil) flushCount = int64(sourceTab.GetVars()["VStreamerFlushedBinlogs"].(float64)) require.Equal(t, flushCount, int64(1), "VStreamerFlushedBinlogs should now be 1") // Now if we do another vdiff, we should NOT rotate the binlogs again // as we haven't been generating a lot of new binlog events. - vdiff(t, targetKs, workflow, defaultCellName, true, false, nil) + vdiff(t, targetKs, workflow, defaultCellName, nil) flushCount = int64(sourceTab.GetVars()["VStreamerFlushedBinlogs"].(float64)) require.Equal(t, flushCount, int64(1), "VStreamerFlushedBinlogs should still be 1") } @@ -802,7 +802,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("update `%s` set name='xyz'", tbl)) } } - vdiffSideBySide(t, ksWorkflow, "") + doVDiff(t, ksWorkflow, "") cellNames := getCellNames(cells) switchReadsDryRun(t, workflowType, cellNames, ksWorkflow, dryRunResultsReadCustomerShard) switchReads(t, workflowType, cellNames, ksWorkflow, false) @@ -834,7 +834,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl catchup(t, productTab, workflow, "MoveTables") - vdiffSideBySide(t, "product.p2c_reverse", "") + doVDiff(t, "product.p2c_reverse", "") if withOpenTx { execVtgateQuery(t, vtgateConn, "", deleteOpenTxQuery) } @@ -1069,7 +1069,7 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou } } restartWorkflow(t, ksWorkflow) - vdiffSideBySide(t, ksWorkflow, "") + doVDiff(t, ksWorkflow, "") if dryRunResultSwitchReads != nil { reshardAction(t, "SwitchTraffic", workflow, ksName, "", "", callNames, "rdonly,replica", "--dry-run") } @@ -1108,7 +1108,7 @@ func shardOrders(t *testing.T) { workflowType := "MoveTables" catchup(t, customerTab1, workflow, workflowType) catchup(t, customerTab2, workflow, workflowType) - vdiffSideBySide(t, ksWorkflow, "") + doVDiff(t, ksWorkflow, "") switchReads(t, workflowType, strings.Join(vc.CellNames, ","), ksWorkflow, false) switchWrites(t, workflowType, ksWorkflow, false) moveTablesAction(t, "Complete", cell, workflow, sourceKs, targetKs, tables) @@ -1119,18 +1119,11 @@ func shardOrders(t *testing.T) { } func checkThatVDiffFails(t *testing.T, keyspace, workflow string) { - ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflow) - t.Run("check that vdiffSideBySide won't run", func(t2 *testing.T) { - output, err := vc.VtctlClient.ExecuteCommandWithOutput("VDiff", "--", "--v1", ksWorkflow) + t.Run("check that vdiff won't run", func(t2 *testing.T) { + output, err := vc.VtctldClient.ExecuteCommandWithOutput("VDiff", "--workflow", workflow, "--target-keyspace", keyspace, "create") require.Error(t, err) require.Contains(t, output, "invalid VDiff run") }) - t.Run("check that vdiff2 won't run", func(t2 *testing.T) { - output, err := vc.VtctlClient.ExecuteCommandWithOutput("VDiff", "--", ksWorkflow) - require.Error(t, err) - require.Contains(t, output, "invalid VDiff run") - - }) } func shardMerchant(t *testing.T) { @@ -1155,7 +1148,7 @@ func shardMerchant(t *testing.T) { catchup(t, merchantTab1, workflow, workflowType) catchup(t, merchantTab2, workflow, workflowType) - vdiffSideBySide(t, fmt.Sprintf("%s.%s", merchantKeyspace, workflow), "") + doVDiff(t, fmt.Sprintf("%s.%s", merchantKeyspace, workflow), "") switchReads(t, workflowType, strings.Join(vc.CellNames, ","), ksWorkflow, false) switchWrites(t, workflowType, ksWorkflow, false) printRoutingRules(t, vc, "After merchant movetables") @@ -1174,34 +1167,27 @@ func shardMerchant(t *testing.T) { }) } -func materialize(t *testing.T, spec string, useVtctldClient bool) { - if useVtctldClient { - t.Run("vtctldclient materialize", func(t *testing.T) { - // Split out the parameters from the JSON spec for - // use in the vtctldclient command flags. - // This allows us to test both clients with the same - // input. - sj := gjson.Parse(spec) - workflow := sj.Get("workflow").String() - require.NotEmpty(t, workflow, "workflow not found in spec: %s", spec) - sourceKeyspace := sj.Get("source_keyspace").String() - require.NotEmpty(t, sourceKeyspace, "source_keyspace not found in spec: %s", spec) - targetKeyspace := sj.Get("target_keyspace").String() - require.NotEmpty(t, targetKeyspace, "target_keyspace not found in spec: %s", spec) - tableSettings := sj.Get("table_settings").String() - require.NotEmpty(t, tableSettings, "table_settings not found in spec: %s", spec) - stopAfterCopy := sj.Get("stop-after-copy").Bool() // Optional - err := vc.VtctldClient.ExecuteCommand("materialize", "--workflow", workflow, "--target-keyspace", targetKeyspace, - "create", "--source-keyspace", sourceKeyspace, "--table-settings", tableSettings, - fmt.Sprintf("--stop-after-copy=%t", stopAfterCopy)) - require.NoError(t, err, "Materialize") - }) - } else { - t.Run("materialize", func(t *testing.T) { - err := vc.VtctlClient.ExecuteCommand("Materialize", spec) - require.NoError(t, err, "Materialize") - }) - } +func materialize(t *testing.T, spec string) { + t.Run("vtctldclient materialize", func(t *testing.T) { + // Split out the parameters from the JSON spec for + // use in the vtctldclient command flags. + // This allows us to test both clients with the same + // input. + sj := gjson.Parse(spec) + workflow := sj.Get("workflow").String() + require.NotEmpty(t, workflow, "workflow not found in spec: %s", spec) + sourceKeyspace := sj.Get("source_keyspace").String() + require.NotEmpty(t, sourceKeyspace, "source_keyspace not found in spec: %s", spec) + targetKeyspace := sj.Get("target_keyspace").String() + require.NotEmpty(t, targetKeyspace, "target_keyspace not found in spec: %s", spec) + tableSettings := sj.Get("table_settings").String() + require.NotEmpty(t, tableSettings, "table_settings not found in spec: %s", spec) + stopAfterCopy := sj.Get("stop-after-copy").Bool() // Optional + err := vc.VtctldClient.ExecuteCommand("materialize", "--workflow", workflow, "--target-keyspace", targetKeyspace, + "create", "--source-keyspace", sourceKeyspace, "--table-settings", tableSettings, + fmt.Sprintf("--stop-after-copy=%t", stopAfterCopy)) + require.NoError(t, err, "Materialize") + }) } func testMaterializeWithNonExistentTable(t *testing.T) { @@ -1216,14 +1202,14 @@ func testMaterializeWithNonExistentTable(t *testing.T) { }) } -func materializeProduct(t *testing.T, useVtctldClient bool) { +func materializeProduct(t *testing.T) { t.Run("materializeProduct", func(t *testing.T) { // Materializing from "product" keyspace to "customer" keyspace. workflow := "cproduct" keyspace := "customer" defaultCell := vc.Cells[vc.CellNames[0]] applyVSchema(t, materializeProductVSchema, keyspace) - materialize(t, materializeProductSpec, useVtctldClient) + materialize(t, materializeProductSpec) customerTablets := vc.getVttabletsInKeyspace(t, defaultCell, keyspace, "primary") for _, tab := range customerTablets { catchup(t, tab, workflow, "Materialize") @@ -1303,7 +1289,7 @@ func materializeProduct(t *testing.T, useVtctldClient bool) { }) } -func materializeRollup(t *testing.T, useVtctldClient bool) { +func materializeRollup(t *testing.T) { t.Run("materializeRollup", func(t *testing.T) { vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) defer vtgateConn.Close() @@ -1312,7 +1298,7 @@ func materializeRollup(t *testing.T, useVtctldClient bool) { applyVSchema(t, materializeSalesVSchema, keyspace) defaultCell := vc.Cells[vc.CellNames[0]] productTab := vc.Cells[defaultCell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet - materialize(t, materializeRollupSpec, useVtctldClient) + materialize(t, materializeRollupSpec) catchup(t, productTab, workflow, "Materialize") waitForRowCount(t, vtgateConn, "product", "rollup", 1) waitForQueryResult(t, vtgateConn, "product:0", "select rollupname, kount from rollup", @@ -1320,13 +1306,13 @@ func materializeRollup(t *testing.T, useVtctldClient bool) { }) } -func materializeSales(t *testing.T, useVtctldClient bool) { +func materializeSales(t *testing.T) { t.Run("materializeSales", func(t *testing.T) { vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) defer vtgateConn.Close() keyspace := "product" applyVSchema(t, materializeSalesVSchema, keyspace) - materialize(t, materializeSalesSpec, useVtctldClient) + materialize(t, materializeSalesSpec) defaultCell := vc.Cells[vc.CellNames[0]] productTab := vc.Cells[defaultCell.Name].Keyspaces["product"].Shards["0"].Tablets["zone1-100"].Vttablet catchup(t, productTab, "sales", "Materialize") @@ -1336,12 +1322,12 @@ func materializeSales(t *testing.T, useVtctldClient bool) { }) } -func materializeMerchantSales(t *testing.T, useVtctldClient bool) { +func materializeMerchantSales(t *testing.T) { t.Run("materializeMerchantSales", func(t *testing.T) { vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) defer vtgateConn.Close() workflow := "msales" - materialize(t, materializeMerchantSalesSpec, useVtctldClient) + materialize(t, materializeMerchantSalesSpec) defaultCell := vc.Cells[vc.CellNames[0]] merchantTablets := vc.getVttabletsInKeyspace(t, defaultCell, merchantKeyspace, "primary") for _, tab := range merchantTablets { @@ -1353,14 +1339,14 @@ func materializeMerchantSales(t *testing.T, useVtctldClient bool) { }) } -func materializeMerchantOrders(t *testing.T, useVtctldClient bool) { +func materializeMerchantOrders(t *testing.T) { t.Run("materializeMerchantOrders", func(t *testing.T) { vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) defer vtgateConn.Close() workflow := "morders" keyspace := merchantKeyspace applyVSchema(t, merchantOrdersVSchema, keyspace) - materialize(t, materializeMerchantOrdersSpec, useVtctldClient) + materialize(t, materializeMerchantOrdersSpec) defaultCell := vc.Cells[vc.CellNames[0]] merchantTablets := vc.getVttabletsInKeyspace(t, defaultCell, merchantKeyspace, "primary") for _, tab := range merchantTablets { diff --git a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go index 4ee977c4d74..7a7247b39bf 100644 --- a/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go +++ b/go/test/endtoend/vreplication/vreplication_vtctldclient_cli_test.go @@ -180,7 +180,7 @@ func testMoveTablesFlags2(t *testing.T, mt *iMoveTables, sourceKeyspace, targetK for _, tab := range targetTabs { alias := fmt.Sprintf("zone1-%d", tab.TabletUID) query := "update _vt.vreplication set source := replace(source, 'stop_after_copy:true', 'stop_after_copy:false') where db_name = 'vt_customer' and workflow = 'wf1'" - output, err := vc.VtctlClient.ExecuteCommandWithOutput("ExecuteFetchAsDba", alias, query) + output, err := vc.VtctldClient.ExecuteCommandWithOutput("ExecuteFetchAsDBA", alias, query) require.NoError(t, err, output) } confirmNoRoutingRules(t) @@ -263,7 +263,7 @@ func testMoveTablesFlags2(t *testing.T, mt *iMoveTables, sourceKeyspace, targetK confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched) // Confirm that everything is still in sync after our switch fest. - vdiff(t, targetKeyspace, workflowName, "zone1", false, true, nil) + vdiff(t, targetKeyspace, workflowName, "zone1", nil) (*mt).SwitchReadsAndWrites() validateReadsRouteToTarget(t, "replica") @@ -282,7 +282,7 @@ func testMoveTablesFlags2(t *testing.T, mt *iMoveTables, sourceKeyspace, targetK func testMoveTablesFlags3(t *testing.T, sourceKeyspace, targetKeyspace string, targetTabs map[string]*cluster.VttabletProcess) { for _, tab := range targetTabs { alias := fmt.Sprintf("zone1-%d", tab.TabletUID) - output, err := vc.VtctlClient.ExecuteCommandWithOutput("ExecuteFetchAsDba", alias, "drop table customer") + output, err := vc.VtctldClient.ExecuteCommandWithOutput("ExecuteFetchAsDBA", alias, "drop table customer") require.NoError(t, err, output) } createFlags := []string{} @@ -492,7 +492,7 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards for _, tab := range targetTabs { alias := fmt.Sprintf("zone1-%d", tab.TabletUID) query := "update _vt.vreplication set source := replace(source, 'stop_after_copy:true', 'stop_after_copy:false') where db_name = 'vt_customer' and workflow = '" + workflowName + "'" - output, err := vc.VtctlClient.ExecuteCommandWithOutput("ExecuteFetchAsDba", alias, query) + output, err := vc.VtctldClient.ExecuteCommandWithOutput("ExecuteFetchAsDBA", alias, query) require.NoError(t, err, output) } rs.Start() @@ -504,7 +504,7 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards for _, targetTab := range targetTabs { catchup(t, targetTab, workflowName, "Reshard") } - vdiff(t, keyspace, workflowName, "zone1", false, true, nil) + vdiff(t, keyspace, workflowName, "zone1", nil) shardReadsRouteToSource := func() { require.True(t, getShardRoute(t, keyspace, "-80", "replica")) @@ -524,14 +524,14 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards rs.SwitchReadsAndWrites() waitForLowLag(t, keyspace, workflowName+"_reverse") - vdiff(t, keyspace, workflowName+"_reverse", "zone1", true, false, nil) + vdiff(t, keyspace, workflowName+"_reverse", "zone1", nil) shardReadsRouteToTarget() shardWritesRouteToTarget() confirmStates(t, &wf, wrangler.WorkflowStateNotSwitched, wrangler.WorkflowStateAllSwitched) rs.ReverseReadsAndWrites() waitForLowLag(t, keyspace, workflowName) - vdiff(t, keyspace, workflowName, "zone1", false, true, nil) + vdiff(t, keyspace, workflowName, "zone1", nil) shardReadsRouteToSource() shardWritesRouteToSource() confirmStates(t, &wf, wrangler.WorkflowStateAllSwitched, wrangler.WorkflowStateNotSwitched) @@ -587,7 +587,7 @@ func splitShard(t *testing.T, keyspace, workflowName, sourceShards, targetShards confirmStates(t, &wf, wrangler.WorkflowStateReadsSwitched, wrangler.WorkflowStateNotSwitched) // Confirm that everything is still in sync after our switch fest. - vdiff(t, keyspace, workflowName, "zone1", false, true, nil) + vdiff(t, keyspace, workflowName, "zone1", nil) rs.SwitchReadsAndWrites() shardReadsRouteToTarget() diff --git a/go/test/endtoend/vreplication/vschema_load_test.go b/go/test/endtoend/vreplication/vschema_load_test.go index 6ca8dcfe472..e14d3be8720 100644 --- a/go/test/endtoend/vreplication/vschema_load_test.go +++ b/go/test/endtoend/vreplication/vschema_load_test.go @@ -120,7 +120,7 @@ func TestVSchemaChangesUnderLoad(t *testing.T) { defer timer.Stop() log.Infof("Started ApplyVSchema") for { - if err := vc.VtctlClient.ExecuteCommand("ApplyVSchema", "--", "--vschema={}", "product"); err != nil { + if err := vc.VtctldClient.ExecuteCommand("ApplyVSchema", "--vschema={}", "product"); err != nil { log.Errorf("ApplyVSchema command failed with %+v\n", err) return } @@ -140,8 +140,8 @@ func TestVSchemaChangesUnderLoad(t *testing.T) { }() <-ch // wait for enough ApplyVSchema calls before doing a PRS - if err := vc.VtctlClient.ExecuteCommand("PlannedReparentShard", "--", "--keyspace_shard", "product/0", - "--new_primary", "zone1-101", "--wait_replicas_timeout", defaultTimeout.String()); err != nil { + if err := vc.VtctldClient.ExecuteCommand("PlannedReparentShard", "product/0", + "--new-primary", "zone1-101", "--wait-replicas-timeout", defaultTimeout.String()); err != nil { require.NoError(t, err, "PlannedReparentShard command failed") } } diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index 3f79a35b569..d055ce91bc1 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -140,7 +140,7 @@ func testVStreamWithFailover(t *testing.T, failover bool) { case 1: if failover { insertMu.Lock() - output, err := vc.VtctlClient.ExecuteCommandWithOutput("PlannedReparentShard", "--", "--keyspace_shard=product/0", "--new_primary=zone1-101") + output, err := vc.VtctldClient.ExecuteCommandWithOutput("PlannedReparentShard", "product/0", "--new-primary=zone1-101") insertMu.Unlock() log.Infof("output of first PRS is %s", output) require.NoError(t, err) @@ -148,7 +148,7 @@ func testVStreamWithFailover(t *testing.T, failover bool) { case 2: if failover { insertMu.Lock() - output, err := vc.VtctlClient.ExecuteCommandWithOutput("PlannedReparentShard", "--", "--keyspace_shard=product/0", "--new_primary=zone1-100") + output, err := vc.VtctldClient.ExecuteCommandWithOutput("PlannedReparentShard", "product/0", "--new-primary=zone1-100") insertMu.Unlock() log.Infof("output of second PRS is %s", output) require.NoError(t, err) diff --git a/go/test/endtoend/vreplication/wrappers_test.go b/go/test/endtoend/vreplication/wrappers_test.go index d1fff1af1c6..2ca1b3bb724 100644 --- a/go/test/endtoend/vreplication/wrappers_test.go +++ b/go/test/endtoend/vreplication/wrappers_test.go @@ -24,7 +24,6 @@ import ( "github.com/stretchr/testify/require" "vitess.io/vitess/go/vt/log" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) type iWorkflow interface { @@ -50,17 +49,14 @@ type workflowFlavor int const ( workflowFlavorRandom workflowFlavor = iota - workflowFlavorVtctl workflowFlavorVtctld ) var workflowFlavors = []workflowFlavor{ - workflowFlavorVtctl, workflowFlavorVtctld, } var workflowFlavorNames = map[workflowFlavor]string{ - workflowFlavorVtctl: "vtctl", workflowFlavorVtctld: "vtctld", } @@ -100,8 +96,6 @@ func newMoveTables(vc *VitessCluster, mt *moveTablesWorkflow, flavor workflowFla flavor = workflowFlavors[rand.IntN(len(workflowFlavors))] } switch flavor { - case workflowFlavorVtctl: - mt2 = newVtctlMoveTables(mt) case workflowFlavorVtctld: mt2 = newVtctldMoveTables(mt) default: @@ -111,102 +105,6 @@ func newMoveTables(vc *VitessCluster, mt *moveTablesWorkflow, flavor workflowFla return mt2 } -type VtctlMoveTables struct { - *moveTablesWorkflow -} - -func (vmt *VtctlMoveTables) Flavor() string { - return "vtctl" -} - -func newVtctlMoveTables(mt *moveTablesWorkflow) *VtctlMoveTables { - return &VtctlMoveTables{mt} -} - -func (vmt *VtctlMoveTables) Create() { - currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables - vmt.exec(workflowActionCreate) -} - -func (vmt *VtctlMoveTables) MirrorTraffic() { - // TODO implement me - panic("implement me") -} - -func (vmt *VtctlMoveTables) SwitchReadsAndWrites() { - err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, - vmt.tables, workflowActionSwitchTraffic, "", "", "", defaultWorkflowExecOptions) - require.NoError(vmt.vc.t, err) -} - -func (vmt *VtctlMoveTables) ReverseReadsAndWrites() { - err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, - vmt.tables, workflowActionReverseTraffic, "", "", "", defaultWorkflowExecOptions) - require.NoError(vmt.vc.t, err) -} - -func (vmt *VtctlMoveTables) Show() { - // TODO implement me - panic("implement me") -} - -func (vmt *VtctlMoveTables) Status() { - currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables - vmt.exec("Status") -} - -func (vmt *VtctlMoveTables) exec(action string) { - options := &workflowExecOptions{ - deferSecondaryKeys: false, - atomicCopy: vmt.atomicCopy, - } - err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, - vmt.tables, action, vmt.tabletTypes, vmt.sourceShards, "", options) - require.NoError(vmt.vc.t, err) -} -func (vmt *VtctlMoveTables) SwitchReads() { - err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, - vmt.tables, workflowActionSwitchTraffic, "replica,rdonly", "", "", defaultWorkflowExecOptions) - require.NoError(vmt.vc.t, err) -} - -func (vmt *VtctlMoveTables) SwitchWrites() { - err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, - vmt.tables, workflowActionSwitchTraffic, "primary", "", "", defaultWorkflowExecOptions) - require.NoError(vmt.vc.t, err) -} -func (vmt *VtctlMoveTables) ReverseReads() { - err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, - vmt.tables, workflowActionReverseTraffic, "replica,rdonly", "", "", defaultWorkflowExecOptions) - require.NoError(vmt.vc.t, err) -} - -func (vmt *VtctlMoveTables) ReverseWrites() { - err := tstWorkflowExecVtctl(vmt.vc.t, "", vmt.workflowName, vmt.sourceKeyspace, vmt.targetKeyspace, - vmt.tables, workflowActionReverseTraffic, "primary", "", "", defaultWorkflowExecOptions) - require.NoError(vmt.vc.t, err) -} - -func (vmt *VtctlMoveTables) Cancel() { - vmt.exec(workflowActionCancel) -} - -func (vmt *VtctlMoveTables) Complete() { - vmt.exec(workflowActionComplete) -} - -func (vmt *VtctlMoveTables) GetLastOutput() string { - return vmt.lastOutput -} - -func (vmt *VtctlMoveTables) Start() { - panic("implement me") -} - -func (vmt *VtctlMoveTables) Stop() { - panic("implement me") -} - var _ iMoveTables = (*VtctldMoveTables)(nil) type VtctldMoveTables struct { @@ -347,8 +245,6 @@ func newReshard(vc *VitessCluster, rs *reshardWorkflow, flavor workflowFlavor) i flavor = workflowFlavors[rand.IntN(len(workflowFlavors))] } switch flavor { - case workflowFlavorVtctl: - rs2 = newVtctlReshard(rs) case workflowFlavorVtctld: rs2 = newVtctldReshard(rs) default: @@ -358,93 +254,6 @@ func newReshard(vc *VitessCluster, rs *reshardWorkflow, flavor workflowFlavor) i return rs2 } -type VtctlReshard struct { - *reshardWorkflow -} - -func (vrs *VtctlReshard) ReverseReads() { - //TODO implement me - panic("implement me") -} - -func (vrs *VtctlReshard) ReverseWrites() { - //TODO implement me - panic("implement me") -} - -func (vrs *VtctlReshard) Flavor() string { - return "vtctl" -} - -func newVtctlReshard(rs *reshardWorkflow) *VtctlReshard { - return &VtctlReshard{rs} -} - -func (vrs *VtctlReshard) Create() { - currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard - vrs.exec(workflowActionCreate) -} - -func (vrs *VtctlReshard) MirrorTraffic() { - // TODO implement me - panic("implement me") -} - -func (vrs *VtctlReshard) Status() { - currentWorkflowType = binlogdatapb.VReplicationWorkflowType_Reshard - vrs.exec("Status") -} - -func (vrs *VtctlReshard) SwitchReadsAndWrites() { - vrs.exec(workflowActionSwitchTraffic) -} - -func (vrs *VtctlReshard) ReverseReadsAndWrites() { - vrs.exec(workflowActionReverseTraffic) -} - -func (vrs *VtctlReshard) Show() { - // TODO implement me - panic("implement me") -} - -func (vrs *VtctlReshard) exec(action string) { - options := &workflowExecOptions{} - err := tstWorkflowExecVtctl(vrs.vc.t, "", vrs.workflowName, "", vrs.targetKeyspace, - "", action, vrs.tabletTypes, vrs.sourceShards, vrs.targetShards, options) - require.NoError(vrs.vc.t, err) -} - -func (vrs *VtctlReshard) SwitchReads() { - // TODO implement me - panic("implement me") -} - -func (vrs *VtctlReshard) SwitchWrites() { - // TODO implement me - panic("implement me") -} - -func (vrs *VtctlReshard) Cancel() { - vrs.exec(workflowActionCancel) -} - -func (vrs *VtctlReshard) Complete() { - vrs.exec(workflowActionComplete) -} - -func (vrs *VtctlReshard) GetLastOutput() string { - return vrs.lastOutput -} - -func (vrs *VtctlReshard) Start() { - panic("implement me") -} - -func (vrs *VtctlReshard) Stop() { - panic("implement me") -} - var _ iReshard = (*VtctldReshard)(nil) type VtctldReshard struct { diff --git a/go/test/endtoend/vtcombo/recreate/recreate_test.go b/go/test/endtoend/vtcombo/recreate/recreate_test.go index 15cb63c3d7d..496d26c8062 100644 --- a/go/test/endtoend/vtcombo/recreate/recreate_test.go +++ b/go/test/endtoend/vtcombo/recreate/recreate_test.go @@ -132,9 +132,9 @@ func getMySQLConnectionCount(ctx context.Context, session *vtgateconn.VTGateSess } func assertTabletsPresent(t *testing.T) { - tmpCmd := exec.Command("vtctlclient", "--vtctl_client_protocol", "grpc", "--server", grpcAddress, "--stderrthreshold", "0", "ListAllTablets", "--", "test") + tmpCmd := exec.Command("vtctldclient", "--server", grpcAddress, "GetTablets", "--cell", "test") - log.Infof("Running vtctlclient with command: %v", tmpCmd.Args) + log.Infof("Running vtctldclient with command: %v", tmpCmd.Args) output, err := tmpCmd.CombinedOutput() require.Nil(t, err) diff --git a/go/test/endtoend/vtcombo/vttest_sample_test.go b/go/test/endtoend/vtcombo/vttest_sample_test.go index 4895c1195b0..af0decca3d3 100644 --- a/go/test/endtoend/vtcombo/vttest_sample_test.go +++ b/go/test/endtoend/vtcombo/vttest_sample_test.go @@ -238,9 +238,9 @@ func insertManyRows(ctx context.Context, t *testing.T, conn *vtgateconn.VTGateCo } func assertTabletsPresent(t *testing.T) { - tmpCmd := exec.Command("vtctlclient", "--vtctl_client_protocol", "grpc", "--server", grpcAddress, "--stderrthreshold", "0", "ListAllTablets", "--", "test") + tmpCmd := exec.Command("vtctldclient", "--server", grpcAddress, "GetTablets", "--cell", "test") - log.Infof("Running vtctlclient with command: %v", tmpCmd.Args) + log.Infof("Running vtctldclient with command: %v", tmpCmd.Args) output, err := tmpCmd.CombinedOutput() require.NoError(t, err) diff --git a/go/test/endtoend/vtgate/schema/schema_test.go b/go/test/endtoend/vtgate/schema/schema_test.go index 4c28e29ca0d..fd84b5b2793 100644 --- a/go/test/endtoend/vtgate/schema/schema_test.go +++ b/go/test/endtoend/vtgate/schema/schema_test.go @@ -294,7 +294,7 @@ func testCopySchemaShards(t *testing.T, source string, shard int) { checkTablesCount(t, clusterInstance.Keyspaces[0].Shards[shard].Vttablets[1], 0) // Run the command twice to make sure it's idempotent. for i := 0; i < 2; i++ { - err := clusterInstance.VtctlclientProcess.ExecuteCommand("CopySchemaShard", source, fmt.Sprintf("%s/%d", keyspaceName, shard)) + err := clusterInstance.VtctldClientProcess.ExecuteCommand("CopySchemaShard", source, fmt.Sprintf("%s/%d", keyspaceName, shard)) require.Nil(t, err) } // shard2 primary should look the same as the replica we copied from @@ -329,7 +329,7 @@ func testCopySchemaShardWithDifferentDB(t *testing.T, shard int) { err = clusterInstance.VtctldClientProcess.ExecuteCommand("ExecuteFetchAsDBA", "--json", tabletAlias, "ALTER DATABASE vt_ks CHARACTER SET latin1") require.Nil(t, err) - output, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("CopySchemaShard", source, fmt.Sprintf("%s/%d", keyspaceName, shard)) + output, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("CopySchemaShard", source, fmt.Sprintf("%s/%d", keyspaceName, shard)) require.Error(t, err) assert.True(t, strings.Contains(output, "schemas are different")) diff --git a/go/test/endtoend/vtorc/api/api_test.go b/go/test/endtoend/vtorc/api/api_test.go index 3fe43fa8f8f..22091e5cce5 100644 --- a/go/test/endtoend/vtorc/api/api_test.go +++ b/go/test/endtoend/vtorc/api/api_test.go @@ -139,7 +139,7 @@ func TestAPIEndpoints(t *testing.T) { }) t.Run("Replication Analysis API", func(t *testing.T) { - // use vtctlclient to stop replication + // use vtctldclient to stop replication _, err := clusterInfo.ClusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("StopReplication", replica.Alias) require.NoError(t, err) diff --git a/go/test/endtoend/vtorc/general/vtorc_test.go b/go/test/endtoend/vtorc/general/vtorc_test.go index a4ed71945be..4a2f50b4168 100644 --- a/go/test/endtoend/vtorc/general/vtorc_test.go +++ b/go/test/endtoend/vtorc/general/vtorc_test.go @@ -216,7 +216,7 @@ func TestVTOrcRepairs(t *testing.T) { }) t.Run("StopReplication", func(t *testing.T) { - // use vtctlclient to stop replication + // use vtctldclient to stop replication _, err := clusterInfo.ClusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("StopReplication", replica.Alias) require.NoError(t, err) diff --git a/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go b/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go index 9017d35a8c5..b296ef554b2 100644 --- a/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go +++ b/go/test/endtoend/vtorc/primaryfailure/primary_failure_test.go @@ -199,7 +199,7 @@ func TestDeletedPrimaryTablet(t *testing.T) { // Disable VTOrc recoveries vtOrcProcess.DisableGlobalRecoveries(t) - // use vtctlclient to stop replication on the replica + // use vtctldclient to stop replication on the replica _, err := clusterInfo.ClusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("StopReplication", replica.Alias) require.NoError(t, err) // insert a write that is not available on the replica. @@ -209,7 +209,7 @@ func TestDeletedPrimaryTablet(t *testing.T) { _ = curPrimary.VttabletProcess.TearDown() err = curPrimary.MysqlctlProcess.Stop() require.NoError(t, err) - // use vtctlclient to start replication on the replica back + // use vtctldclient to start replication on the replica back _, err = clusterInfo.ClusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("StartReplication", replica.Alias) require.NoError(t, err) err = clusterInfo.ClusterInstance.VtctldClientProcess.ExecuteCommand("DeleteTablets", "--allow-primary", curPrimary.Alias)