Skip to content

Commit

Permalink
Introducing ExecuteMultiFetchAsDba gRPC and `vtctldclient ExecuteMu…
Browse files Browse the repository at this point in the history
…ltiFetchAsDBA` command (#15506)

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
shlomi-noach authored Mar 19, 2024
1 parent 1b7b2b8 commit 8833092
Show file tree
Hide file tree
Showing 32 changed files with 8,577 additions and 5,111 deletions.
66 changes: 66 additions & 0 deletions go/cmd/vtctldclient/command/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ var (
RunE: commandExecuteFetchAsDBA,
Aliases: []string{"ExecuteFetchAsDba"},
}
// ExecuteMultiFetchAsDBA makes an ExecuteMultiFetchAsDBA gRPC call to a vtctld.
ExecuteMultiFetchAsDBA = &cobra.Command{
Use: "ExecuteMultiFetchAsDBA [--max-rows <max-rows>] [--json|-j] [--disable-binlogs] [--reload-schema] <tablet alias> <sql>",
Short: "Executes given multiple queries as the DBA user on the remote tablet.",
DisableFlagsInUseLine: true,
Args: cobra.ExactArgs(2),
RunE: commandExecuteMultiFetchAsDBA,
Aliases: []string{"ExecuteMultiFetchAsDba"},
}
)

var executeFetchAsAppOptions = struct {
Expand Down Expand Up @@ -138,6 +147,57 @@ func commandExecuteFetchAsDBA(cmd *cobra.Command, args []string) error {
return nil
}

var executeMultiFetchAsDBAOptions = struct {
MaxRows int64
DisableBinlogs bool
ReloadSchema bool
JSON bool
}{
MaxRows: 10_000,
}

func commandExecuteMultiFetchAsDBA(cmd *cobra.Command, args []string) error {
alias, err := topoproto.ParseTabletAlias(cmd.Flags().Arg(0))
if err != nil {
return err
}

cli.FinishedParsing(cmd)

sql := cmd.Flags().Arg(1)

resp, err := client.ExecuteMultiFetchAsDBA(commandCtx, &vtctldatapb.ExecuteMultiFetchAsDBARequest{
TabletAlias: alias,
Sql: sql,
MaxRows: executeMultiFetchAsDBAOptions.MaxRows,
DisableBinlogs: executeMultiFetchAsDBAOptions.DisableBinlogs,
ReloadSchema: executeMultiFetchAsDBAOptions.ReloadSchema,
})
if err != nil {
return err
}

var qrs []*sqltypes.Result
for _, result := range resp.Results {
qr := sqltypes.Proto3ToResult(result)
qrs = append(qrs, qr)
}

switch executeMultiFetchAsDBAOptions.JSON {
case true:
data, err := cli.MarshalJSON(qrs)
if err != nil {
return err
}
fmt.Printf("%s\n", data)
default:
for _, qr := range qrs {
cli.WriteQueryResultTable(cmd.OutOrStdout(), qr)
}
}
return nil
}

func init() {
ExecuteFetchAsApp.Flags().Int64Var(&executeFetchAsAppOptions.MaxRows, "max-rows", 10_000, "The maximum number of rows to fetch from the remote tablet.")
ExecuteFetchAsApp.Flags().BoolVar(&executeFetchAsAppOptions.UsePool, "use-pool", false, "Use the tablet connection pool instead of creating a fresh connection.")
Expand All @@ -149,4 +209,10 @@ func init() {
ExecuteFetchAsDBA.Flags().BoolVar(&executeFetchAsDBAOptions.ReloadSchema, "reload-schema", false, "Instructs the tablet to reload its schema after executing the query.")
ExecuteFetchAsDBA.Flags().BoolVarP(&executeFetchAsDBAOptions.JSON, "json", "j", false, "Output the results in JSON instead of a human-readable table.")
Root.AddCommand(ExecuteFetchAsDBA)

ExecuteMultiFetchAsDBA.Flags().Int64Var(&executeMultiFetchAsDBAOptions.MaxRows, "max-rows", 10_000, "The maximum number of rows to fetch from the remote tablet.")
ExecuteMultiFetchAsDBA.Flags().BoolVar(&executeMultiFetchAsDBAOptions.DisableBinlogs, "disable-binlogs", false, "Disables binary logging during the query.")
ExecuteMultiFetchAsDBA.Flags().BoolVar(&executeMultiFetchAsDBAOptions.ReloadSchema, "reload-schema", false, "Instructs the tablet to reload its schema after executing the query.")
ExecuteMultiFetchAsDBA.Flags().BoolVarP(&executeMultiFetchAsDBAOptions.JSON, "json", "j", false, "Output the results in JSON instead of a human-readable table.")
Root.AddCommand(ExecuteMultiFetchAsDBA)
}
1 change: 1 addition & 0 deletions go/flags/endtoend/vtctldclient.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Available Commands:
ExecuteFetchAsApp Executes the given query as the App user on the remote tablet.
ExecuteFetchAsDBA Executes the given query as the DBA user on the remote tablet.
ExecuteHook Runs the specified hook on the given tablet.
ExecuteMultiFetchAsDBA Executes given multiple queries as the DBA user on the remote tablet.
FindAllShardsInKeyspace Returns a map of shard names to shard references for a given keyspace.
GenerateShardRanges Print a set of shard ranges assuming a keyspace with N shards.
GetBackups Lists backups for the given shard.
Expand Down
5 changes: 3 additions & 2 deletions go/test/endtoend/tabletgateway/vtgate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,14 @@ func TestVtgateReplicationStatusCheck(t *testing.T) {
// Stop replication on the non-PRIMARY tablets.
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "stop slave")
require.NoError(t, err)
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "stop slave")
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteMultiFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "stop slave")
require.NoError(t, err)
// Restart replication afterward as the cluster is re-used.
defer func() {
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Replica().Alias, "start slave")
require.NoError(t, err)
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "start slave")
// Testing ExecuteMultiFetchAsDBA by running multiple commands in a single call:
_, err = clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteMultiFetchAsDBA", clusterInstance.Keyspaces[0].Shards[0].Rdonly().Alias, "start slave sql_thread; start slave io_thread;")
require.NoError(t, err)
}()
time.Sleep(2 * time.Second) // Build up some replication lag
Expand Down
35 changes: 30 additions & 5 deletions go/test/endtoend/tabletmanager/commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,21 @@ func TestTabletCommands(t *testing.T) {
utils.Exec(t, conn, "insert into t1(id, value) values(1,'a'), (2,'b')")
checkDataOnReplica(t, replicaConn, `[[VARCHAR("a")] [VARCHAR("b")]]`)

// make sure direct dba queries work
sql := "select * from t1"
result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", "--json", primaryTablet.Alias, sql)
require.Nil(t, err)
assertExecuteFetch(t, result)
t.Run("ExecuteFetchAsDBA", func(t *testing.T) {
// make sure direct dba queries work
sql := "select * from t1"
result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDBA", "--json", primaryTablet.Alias, sql)
require.Nil(t, err)
assertExecuteFetch(t, result)
})

t.Run("ExecuteMultiFetchAsDBA", func(t *testing.T) {
// make sure direct dba queries work
sql := "select * from t1; select * from t1 limit 100"
result, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("ExecuteMultiFetchAsDBA", "--json", primaryTablet.Alias, sql)
require.Nil(t, err)
assertExecuteMultiFetch(t, result)
})
// check Ping / RefreshState / RefreshStateByShard
err = clusterInstance.VtctldClientProcess.ExecuteCommand("PingTablet", primaryTablet.Alias)
require.Nil(t, err, "error should be Nil")
Expand Down Expand Up @@ -139,6 +148,22 @@ func assertExecuteFetch(t *testing.T, qr string) {
want = int(2)
assert.Equal(t, want, got)
}
func assertExecuteMultiFetch(t *testing.T, qr string) {
resultMap := make([]map[string]any, 0)
err := json.Unmarshal([]byte(qr), &resultMap)
require.Nil(t, err)
require.NotEmpty(t, resultMap)

rows := reflect.ValueOf(resultMap[0]["rows"])
got := rows.Len()
want := int(2)
assert.Equal(t, want, got)

fields := reflect.ValueOf(resultMap[0]["fields"])
got = fields.Len()
want = int(2)
assert.Equal(t, want, got)
}

func TestHook(t *testing.T) {
// test a regular program works
Expand Down
Loading

0 comments on commit 8833092

Please sign in to comment.