Skip to content

Commit

Permalink
Merge branch 'main' into healthcheck-init-concurrency
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt authored Feb 28, 2024
2 parents bc55057 + 059e50d commit 3016c96
Show file tree
Hide file tree
Showing 79 changed files with 8,890 additions and 7,976 deletions.
11 changes: 11 additions & 0 deletions changelog/20.0/20.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
### Table of Contents

- **[Major Changes](#major-changes)**
- **[Breaking changes](#breaking-changes)**
- [`shutdown_grace_period` Default Change](#shutdown-grace-period-default)
- **[Query Compatibility](#query-compatibility)**
- [Vindex Hints](#vindex-hints)
- [Update with Limit Support](#update-limit)
Expand All @@ -17,6 +19,15 @@

## <a id="major-changes"/>Major Changes

### <a id="breaking-changes"/>Breaking Changes

#### <a id="shutdown-grace-period-default"/>`shutdown_grace_period` Default Change

The `--shutdown_grace_period` flag, which was introduced in v2 with a default of `0 seconds`, has now been changed to default to `3 seconds`.
This makes reparenting in Vitess resilient to client errors, and prevents PlannedReparentShard from timing out.

In order to preserve the old behaviour, the users can set the flag back to `0 seconds` causing open transactions to never be shutdown, but in that case, they run the risk of PlannedReparentShard calls timing out.

### <a id="query-compatibility"/>Query Compatibility

#### <a id="vindex-hints"/> Vindex Hints
Expand Down
51 changes: 36 additions & 15 deletions go/cmd/vttestserver/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"os"
"os/signal"
"path"
"strconv"
"strings"
"syscall"
Expand All @@ -47,13 +48,14 @@ type topoFlags struct {
}

var (
basePort int
config vttest.Config
doSeed bool
mycnf string
protoTopo string
seed vttest.SeedConfig
topo topoFlags
basePort int
config vttest.Config
doSeed bool
mycnf string
protoTopo string
seed vttest.SeedConfig
topo topoFlags
doCreateTCPUser bool
)

func (t *topoFlags) buildTopology() (*vttestpb.VTTestTopology, error) {
Expand Down Expand Up @@ -220,25 +222,44 @@ func New() (cmd *cobra.Command) {
cmd.Flags().StringVar(&config.ExternalTopoGlobalRoot, "external_topo_global_root", "", "the path of the global topology data in the global topology server for vtcombo process")

cmd.Flags().DurationVar(&config.VtgateTabletRefreshInterval, "tablet_refresh_interval", 10*time.Second, "Interval at which vtgate refreshes tablet information from topology server.")

cmd.Flags().BoolVar(&doCreateTCPUser, "initialize-with-vt-dba-tcp", false, "If this flag is enabled, MySQL will be initialized with an additional user named vt_dba_tcp, who will have access via TCP/IP connection.")
acl.RegisterFlags(cmd.Flags())

return cmd
}

func newEnv() (env vttest.Environment, err error) {
if basePort != 0 {
func newEnv() (env *vttest.LocalTestEnv, err error) {
if basePort == 0 {
env, err = vttest.NewLocalTestEnv(0)
} else {
if config.DataDir == "" {
env, err = vttest.NewLocalTestEnv(basePort)
if err != nil {
return
}
} else {
env, err = vttest.NewLocalTestEnvWithDirectory(basePort, config.DataDir)
if err != nil {
return
}
}
}
if err != nil {
return
}

if doCreateTCPUser {
// The original initFile does not have any users who can access through TCP/IP connection.
// Here we update the init file to create the user.
mysqlInitFile := env.InitDBFile
createUserCmd := `
# Admin user for TCP/IP connection with all privileges.
CREATE USER 'vt_dba_tcp'@'%';
GRANT ALL ON *.* TO 'vt_dba_tcp'@'%';
GRANT GRANT OPTION ON *.* TO 'vt_dba_tcp'@'%';
`
newInitFile := path.Join(env.Directory(), "init_db_with_vt_dba_tcp.sql")
err = vttest.WriteInitDBFile(mysqlInitFile, createUserCmd, newInitFile)
if err != nil {
return
}
env.InitDBFile = newInitFile
}

if protoTopo == "" {
config.Topology, err = topo.buildTopology()
Expand Down
33 changes: 33 additions & 0 deletions go/cmd/vttestserver/cli/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,39 @@ func TestForeignKeysAndDDLModes(t *testing.T) {
assert.NoError(t, err)
}

// TestCreateDbaTCPUser tests that the vt_dba_tcp user is created and can connect through TCP/IP connection
// when --initialize-with-vt-dba-tcp is set to true.
func TestCreateDbaTCPUser(t *testing.T) {
conf := config
defer resetConfig(conf)

clusterInstance, err := startCluster("--initialize-with-vt-dba-tcp=true")
assert.NoError(t, err)
defer clusterInstance.TearDown()

defer func() {
if t.Failed() {
cluster.PrintFiles(t, clusterInstance.Env.Directory(), "init_db_with_vt_dba_tcp.sql")
}
}()

// Ensure that the vt_dba_tcp user was created and can connect through TCP/IP connection.
ctx := context.Background()
vtParams := mysql.ConnParams{
Host: "127.0.0.1",
Uname: "vt_dba_tcp",
Port: clusterInstance.Env.PortForProtocol("mysql", ""),
}
conn, err := mysql.Connect(ctx, &vtParams)
assert.NoError(t, err)
defer conn.Close()

// Ensure that the existing vt_dba user remains unaffected, meaning it cannot connect through TCP/IP connection.
vtParams.Uname = "vt_dba"
_, err = mysql.Connect(ctx, &vtParams)
assert.Error(t, err)
}

func TestCanGetKeyspaces(t *testing.T) {
conf := config
defer resetConfig(conf)
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtbackup.txt
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ Flags:
--stderrthreshold severityFlag logs at or above this threshold go to stderr (default 1)
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
4 changes: 2 additions & 2 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ Flags:
--service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice
--serving_state_grace_period duration how long to pause after broadcasting health to vtgate, before enforcing a new serving state
--shard_sync_retry_delay duration delay between retries of updates to keep the tablet and its shard record in sync (default 30s)
--shutdown_grace_period duration how long to wait for queries and transactions to complete during graceful shutdown.
--shutdown_grace_period duration how long to wait for queries and transactions to complete during graceful shutdown. (default 3s)
--sql-max-length-errors int truncate queries in error logs to the given length (default unlimited)
--sql-max-length-ui int truncate queries in debug UIs to the given length (default 512) (default 512)
--srv_topo_cache_refresh duration how frequently to refresh the topology for cached entries (default 1s)
Expand All @@ -343,7 +343,7 @@ Flags:
--tablet_hostname string if not empty, this hostname will be assumed instead of trying to resolve it
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtctld.txt
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ Flags:
--tablet_health_keep_alive duration close streaming tablet health connection if there are no requests for this long (default 5m0s)
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Flags:
--table-refresh-interval int interval in milliseconds to refresh tables in status page with refreshRequired class
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
4 changes: 2 additions & 2 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ Flags:
--service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice
--serving_state_grace_period duration how long to pause after broadcasting health to vtgate, before enforcing a new serving state
--shard_sync_retry_delay duration delay between retries of updates to keep the tablet and its shard record in sync (default 30s)
--shutdown_grace_period duration how long to wait for queries and transactions to complete during graceful shutdown.
--shutdown_grace_period duration how long to wait for queries and transactions to complete during graceful shutdown. (default 3s)
--sql-max-length-errors int truncate queries in error logs to the given length (default unlimited)
--sql-max-length-ui int truncate queries in debug UIs to the given length (default 512) (default 512)
--srv_topo_cache_refresh duration how frequently to refresh the topology for cached entries (default 1s)
Expand Down Expand Up @@ -349,7 +349,7 @@ Flags:
--tablet_hostname string if not empty, this hostname will be assumed instead of trying to resolve it
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
3 changes: 2 additions & 1 deletion go/flags/endtoend/vttestserver.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Flags:
--grpc_server_keepalive_time duration After a duration of this time, if the server doesn't see any activity, it pings the client to see if the transport is still alive. (default 10s)
--grpc_server_keepalive_timeout duration After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that the connection is closed. (default 10s)
-h, --help help for vttestserver
--initialize-with-vt-dba-tcp If this flag is enabled, MySQL will be initialized with an additional user named vt_dba_tcp, who will have access via TCP/IP connection.
--initialize_with_random_data If this flag is each table-shard will be initialized with random data. See also the 'rng_seed' and 'min_shard_size' and 'max_shard_size' flags.
--keep_logs duration keep logs for this long (using ctime) (zero to keep forever)
--keep_logs_by_mtime duration keep logs for this long (using mtime) (zero to keep forever)
Expand Down Expand Up @@ -121,7 +122,7 @@ Flags:
--tablet_hostname string The hostname to use for the tablet otherwise it will be derived from OS' hostname (default "localhost")
--tablet_manager_grpc_ca string the server ca to use to validate servers when connecting
--tablet_manager_grpc_cert string the cert to use to connect
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App} and CheckThrottler) (default 8)
--tablet_manager_grpc_concurrency int concurrency to use to talk to a vttablet server for performance-sensitive RPCs (like ExecuteFetchAs{Dba,App}, CheckThrottler and FullStatus) (default 8)
--tablet_manager_grpc_connpool_size int number of tablets to keep tmclient connections open to (default 100)
--tablet_manager_grpc_crl string the server crl to use to validate server certificates when connecting
--tablet_manager_grpc_key string the key to use to connect
Expand Down
15 changes: 9 additions & 6 deletions go/test/endtoend/tabletgateway/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package healthcheck

import (
"flag"
"fmt"
"os"
"testing"

Expand All @@ -26,11 +27,12 @@ import (
)

var (
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
keyspaceName = "commerce"
cell = "zone1"
sqlSchema = `create table product(
clusterInstance *cluster.LocalProcessCluster
vtParams mysql.ConnParams
keyspaceName = "commerce"
vtgateGrpcAddress string
cell = "zone1"
sqlSchema = `create table product(
sku varbinary(128),
description varbinary(128),
price bigint,
Expand Down Expand Up @@ -64,7 +66,7 @@ func TestMain(m *testing.M) {

exitCode := func() int {
clusterInstance = cluster.NewCluster(cell, "localhost")
clusterInstance.VtTabletExtraArgs = []string{"--health_check_interval", "1s"}
clusterInstance.VtTabletExtraArgs = []string{"--health_check_interval", "1s", "--shutdown_grace_period", "3s"}
defer clusterInstance.Teardown()

// Start topo server
Expand Down Expand Up @@ -96,6 +98,7 @@ func TestMain(m *testing.M) {
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
}
vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort)
return m.Run()
}()
os.Exit(exitCode)
Expand Down
39 changes: 39 additions & 0 deletions go/test/endtoend/tabletgateway/vtgate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
vtorcutils "vitess.io/vitess/go/test/endtoend/vtorc/utils"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
)

Expand Down Expand Up @@ -283,6 +284,44 @@ func TestReplicaTransactions(t *testing.T) {
assert.Equal(t, `[[INT64(1) VARCHAR("email1")] [INT64(2) VARCHAR("email2")]]`, fmt.Sprintf("%v", qr4.Rows), "we are not able to reconnect after restart")
}

// TestStreamingRPCStuck tests that StreamExecute calls don't get stuck on the vttablets if a client stop reading from a stream.
func TestStreamingRPCStuck(t *testing.T) {
defer cluster.PanicHandler(t)
ctx := context.Background()
vtConn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer vtConn.Close()

// We want the table to have enough rows such that a streaming call returns multiple packets.
// Therefore, we insert one row and keep doubling it.
utils.Exec(t, vtConn, "insert into customer(email) values('testemail')")
for i := 0; i < 15; i++ {
// Double the number of rows in customer table.
utils.Exec(t, vtConn, "insert into customer (email) select email from customer")
}

// Connect to vtgate and run a streaming query.
vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "test_user", "")
require.NoError(t, err)
stream, err := vtgateConn.Session("", &querypb.ExecuteOptions{}).StreamExecute(ctx, "select * from customer", map[string]*querypb.BindVariable{})
require.NoError(t, err)

// We read packets until we see the first set of results. This ensures that the stream is working.
for {
res, err := stream.Recv()
require.NoError(t, err)
if res != nil && len(res.Rows) > 0 {
// breaking here stops reading from the stream.
break
}
}

// We simulate a misbehaving client that doesn't read from the stream anymore.
// This however shouldn't block PlannedReparentShard calls.
err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, "0", clusterInstance.Keyspaces[0].Shards[0].Vttablets[1].Alias)
require.NoError(t, err)
}

func getMapFromJSON(JSON map[string]any, key string) map[string]any {
result := make(map[string]any)
object := reflect.ValueOf(JSON[key])
Expand Down
Loading

0 comments on commit 3016c96

Please sign in to comment.