Skip to content

Commit

Permalink
Add new mysql connection drain (#16298)
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>
  • Loading branch information
frouioui authored Jul 3, 2024
1 parent 79c54e5 commit d236ccd
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 8 deletions.
1 change: 1 addition & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ Flags:
--mysql_default_workload string Default session workload (OLTP, OLAP, DBA) (default "OLTP")
--mysql_port int mysql port (default 3306)
--mysql_server_bind_address string Binds on this address when listening to MySQL binary protocol. Useful to restrict listening to 'localhost' only for instance.
--mysql_server_drain_onterm If set, the server waits for --onterm_timeout for connected clients to drain
--mysql_server_flush_delay duration Delay after which buffered response will be flushed to the client. (default 100ms)
--mysql_server_port int If set, also listen for MySQL binary protocol connections on this port. (default -1)
--mysql_server_query_timeout duration mysql query timeout
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ Flags:
--mysql_ldap_auth_config_string string JSON representation of LDAP server config.
--mysql_ldap_auth_method string client-side authentication method to use. Supported values: mysql_clear_password, dialog. (default "mysql_clear_password")
--mysql_server_bind_address string Binds on this address when listening to MySQL binary protocol. Useful to restrict listening to 'localhost' only for instance.
--mysql_server_drain_onterm If set, the server waits for --onterm_timeout for connected clients to drain
--mysql_server_flush_delay duration Delay after which buffered response will be flushed to the client. (default 100ms)
--mysql_server_port int If set, also listen for MySQL binary protocol connections on this port. (default -1)
--mysql_server_query_timeout duration mysql query timeout
Expand Down
13 changes: 13 additions & 0 deletions go/test/endtoend/cluster/vtgate_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,19 @@ func (vtgate *VtgateProcess) WaitForStatusOfTabletInShard(name string, endPoints
return fmt.Errorf("wait for %s failed", name)
}

// IsShutdown checks if the vtgate process is shutdown
func (vtgate *VtgateProcess) IsShutdown() bool {
return !vtgate.WaitForStatus()
}

// Terminate sends a SIGTERM to vtgate
func (vtgate *VtgateProcess) Terminate() error {
if vtgate.proc == nil {
return nil
}
return vtgate.proc.Process.Signal(syscall.SIGTERM)
}

// TearDown shuts down the running vtgate service
func (vtgate *VtgateProcess) TearDown() error {
if vtgate.proc == nil || vtgate.exit == nil {
Expand Down
187 changes: 187 additions & 0 deletions go/test/endtoend/vtgate/connectiondrain/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package connectiondrain

import (
"context"
_ "embed"
"flag"
"os"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
)

var (
keyspaceName = "ks"
cell = "zone-1"

//go:embed schema.sql
schemaSQL string
)

func TestMain(m *testing.M) {
defer cluster.PanicHandler(nil)
flag.Parse()
os.Exit(m.Run())
}

func setupCluster(t *testing.T) (*cluster.LocalProcessCluster, mysql.ConnParams) {
clusterInstance := cluster.NewCluster(cell, "localhost")

// Start topo server
err := clusterInstance.StartTopo()
require.NoError(t, err)

// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: schemaSQL,
}
err = clusterInstance.StartUnshardedKeyspace(*keyspace, 0, false)
require.NoError(t, err)

// Start vtgate
clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, "--mysql_server_drain_onterm", "--onterm_timeout", "30s")
err = clusterInstance.StartVtgate()
require.NoError(t, err)

vtParams := clusterInstance.GetVTParams(keyspaceName)
return clusterInstance, vtParams
}

func start(t *testing.T, vtParams mysql.ConnParams) (*mysql.Conn, func()) {
vtConn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)

deleteAll := func() {
_, _ = utils.ExecAllowError(t, vtConn, "set workload = oltp")

tables := []string{"t1"}
for _, table := range tables {
_, _ = utils.ExecAllowError(t, vtConn, "delete from "+table)
}
}

deleteAll()

return vtConn, func() {
deleteAll()
vtConn.Close()
cluster.PanicHandler(t)
}
}

func TestConnectionDrainCloseConnections(t *testing.T) {
clusterInstance, vtParams := setupCluster(t)
defer clusterInstance.Teardown()

vtConn, closer := start(t, vtParams)
defer closer()

// Create a second connection, this connection will be used to create a transaction.
vtConn2, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)

// Start the transaction with the second connection
_, err = vtConn2.ExecuteFetch("BEGIN", 1, false)
require.NoError(t, err)
_, err = vtConn2.ExecuteFetch("select id1 from t1", 1, false)
require.NoError(t, err)

_, err = vtConn.ExecuteFetch("select id1 from t1", 1, false)
require.NoError(t, err)

// Tearing down vtgate here, from there on vtConn should still be able to conclude in-flight transaction and
// execute queries with idle connections. However, no new connections are allowed.
err = clusterInstance.VtgateProcess.Terminate()
require.NoError(t, err)

// Give enough time to vtgate to receive and start processing the SIGTERM signal
time.Sleep(2 * time.Second)

// Create a third connection, this connection should not be allowed.
// Set a connection timeout to 1s otherwise the connection will take forever
// and eventually vtgate will reach the --onterm_timeout.
vtParams.ConnectTimeoutMs = 1000
defer func() {
vtParams.ConnectTimeoutMs = 0
}()
_, err = mysql.Connect(context.Background(), &vtParams)
require.Error(t, err)

// Idle connections should be allowed to execute queries until they are drained
_, err = vtConn.ExecuteFetch("select id1 from t1", 1, false)
require.NoError(t, err)

// Finish the transaction
_, err = vtConn2.ExecuteFetch("select id1 from t1", 1, false)
require.NoError(t, err)
_, err = vtConn2.ExecuteFetch("COMMIT", 1, false)
require.NoError(t, err)
vtConn2.Close()

// vtgate should still be running
require.False(t, clusterInstance.VtgateProcess.IsShutdown())

// This connection should still be allowed
_, err = vtConn.ExecuteFetch("select id1 from t1", 1, false)
require.NoError(t, err)
vtConn.Close()

// Give enough time for vtgate to finish all the onterm hooks without reaching the 30s of --onterm_timeout
time.Sleep(10 * time.Second)

// By now the vtgate should have shutdown on its own and without reaching --onterm_timeout
require.True(t, clusterInstance.VtgateProcess.IsShutdown())
}

func TestConnectionDrainOnTermTimeout(t *testing.T) {
clusterInstance, vtParams := setupCluster(t)
defer clusterInstance.Teardown()

// Connect to vtgate again, this should work
vtConn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)
vtConn2, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)

defer func() {
vtConn.Close()
vtConn2.Close()
}()

// Tearing down vtgate here, we want to reach the onterm_timeout of 30s
err = clusterInstance.VtgateProcess.Terminate()
require.NoError(t, err)

// Run a busy query that returns only after the onterm_timeout is reached, this should fail when we reach the timeout
_, err = vtConn.ExecuteFetch("select sleep(40)", 1, false)
require.Error(t, err)

// Running a query after we have reached the onterm_timeout should fail
_, err = vtConn2.ExecuteFetch("select id from t1", 1, false)
require.Error(t, err)

// By now vtgate will be shutdown becaused it reached its onterm_timeout, despite idle connections still being opened
require.True(t, clusterInstance.VtgateProcess.IsShutdown())
}
5 changes: 5 additions & 0 deletions go/test/endtoend/vtgate/connectiondrain/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
create table t1(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;
2 changes: 1 addition & 1 deletion go/vt/servenv/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func Run(bindAddress string, port int) {
signal.Notify(ExitChan, syscall.SIGTERM, syscall.SIGINT)
// Wait for signal
<-ExitChan
l.Close()

startTime := time.Now()
log.Infof("Entering lameduck mode for at least %v", timeouts.LameduckPeriod)
Expand All @@ -71,6 +70,7 @@ func Run(bindAddress string, port int) {
log.Infof("Sleeping an extra %v after OnTermSync to finish lameduck period", remain)
time.Sleep(remain)
}
l.Close()

log.Info("Shutting down gracefully")
fireOnCloseHooks(timeouts.OnCloseTimeout)
Expand Down
44 changes: 37 additions & 7 deletions go/vt/vtgate/plugin_mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ var (

mysqlDefaultWorkloadName = "OLTP"
mysqlDefaultWorkload int32
mysqlDrainOnTerm bool

mysqlServerFlushDelay = 100 * time.Millisecond
)
Expand Down Expand Up @@ -102,6 +103,7 @@ func registerPluginFlags(fs *pflag.FlagSet) {
fs.DurationVar(&mysqlKeepAlivePeriod, "mysql-server-keepalive-period", mysqlKeepAlivePeriod, "TCP period between keep-alives")
fs.DurationVar(&mysqlServerFlushDelay, "mysql_server_flush_delay", mysqlServerFlushDelay, "Delay after which buffered response will be flushed to the client.")
fs.StringVar(&mysqlDefaultWorkloadName, "mysql_default_workload", mysqlDefaultWorkloadName, "Default session workload (OLTP, OLAP, DBA)")
fs.BoolVar(&mysqlDrainOnTerm, "mysql_server_drain_onterm", mysqlDrainOnTerm, "If set, the server waits for --onterm_timeout for connected clients to drain")
}

// vtgateHandler implements the Listener interface.
Expand Down Expand Up @@ -621,18 +623,34 @@ func newMysqlUnixSocket(address string, authServer mysql.AuthServer, handler mys
}

func (srv *mysqlServer) shutdownMysqlProtocolAndDrain() {
if srv.tcpListener != nil {
srv.tcpListener.Shutdown()
srv.tcpListener = nil
if srv.sigChan != nil {
signal.Stop(srv.sigChan)
}
if srv.unixListener != nil {
srv.unixListener.Shutdown()
setListenerToNil := func() {
srv.tcpListener = nil
srv.unixListener = nil
}
if srv.sigChan != nil {
signal.Stop(srv.sigChan)

if mysqlDrainOnTerm {
stopListener(srv.unixListener, false)
stopListener(srv.tcpListener, false)
setListenerToNil()
// We wait for connected clients to drain by themselves or to run into the onterm timeout
log.Infof("Starting drain loop, waiting for all clients to disconnect")
reported := time.Now()
for srv.vtgateHandle.numConnections() > 0 {
if time.Since(reported) > 2*time.Second {
log.Infof("Still waiting for client connections to drain (%d connected)...", srv.vtgateHandle.numConnections())
reported = time.Now()
}
time.Sleep(1000 * time.Millisecond)
}
return
}

stopListener(srv.unixListener, true)
stopListener(srv.tcpListener, true)
setListenerToNil()
if busy := srv.vtgateHandle.busyConnections.Load(); busy > 0 {
log.Infof("Waiting for all client connections to be idle (%d active)...", busy)
start := time.Now()
Expand All @@ -649,6 +667,18 @@ func (srv *mysqlServer) shutdownMysqlProtocolAndDrain() {
}
}

// stopListener Close or Shutdown a mysql listener depending on the shutdown argument.
func stopListener(listener *mysql.Listener, shutdown bool) {
if listener == nil {
return
}
if shutdown {
listener.Shutdown()
} else {
listener.Close()
}
}

func (srv *mysqlServer) rollbackAtShutdown() {
defer log.Flush()
if srv.vtgateHandle == nil {
Expand Down
9 changes: 9 additions & 0 deletions test/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,15 @@
"RetryMax": 2,
"Tags": []
},
"vtgate_connectiondrain": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vtgate/connectiondrain"],
"Command": [],
"Manual": false,
"Shard": "vtgate_general_heavy",
"RetryMax": 2,
"Tags": []
},
"vtgate_queries_derived": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vtgate/queries/derived"],
Expand Down

0 comments on commit d236ccd

Please sign in to comment.