Skip to content

Commit

Permalink
[release-17.0] Protect ExecuteFetchAsDBA against multi-statements, …
Browse files Browse the repository at this point in the history
…excluding a sequence of `CREATE TABLE|VIEW`. (#14954) (#14983)

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
vitess-bot[bot] and shlomi-noach authored Jan 18, 2024
1 parent 44169e3 commit 383b401
Show file tree
Hide file tree
Showing 13 changed files with 374 additions and 38 deletions.
8 changes: 4 additions & 4 deletions go/test/endtoend/backup/vtbackup/backup_only_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,12 +308,12 @@ func tearDown(t *testing.T, initMysql bool) {
}
caughtUp := waitForReplicationToCatchup([]cluster.Vttablet{*replica1, *replica2})
require.True(t, caughtUp, "Timed out waiting for all replicas to catch up")
promoteCommands := "STOP SLAVE; RESET SLAVE ALL; RESET MASTER;"
disableSemiSyncCommands := "SET GLOBAL rpl_semi_sync_master_enabled = false; SET GLOBAL rpl_semi_sync_slave_enabled = false"
promoteCommands := []string{"STOP SLAVE", "RESET SLAVE ALL", "RESET MASTER"}
disableSemiSyncCommands := []string{"SET GLOBAL rpl_semi_sync_master_enabled = false", " SET GLOBAL rpl_semi_sync_slave_enabled = false"}
for _, tablet := range []cluster.Vttablet{*primary, *replica1, *replica2} {
_, err := tablet.VttabletProcess.QueryTablet(promoteCommands, keyspaceName, true)
err := tablet.VttabletProcess.QueryTabletMultiple(promoteCommands, keyspaceName, true)
require.Nil(t, err)
_, err = tablet.VttabletProcess.QueryTablet(disableSemiSyncCommands, keyspaceName, true)
err = tablet.VttabletProcess.QueryTabletMultiple(disableSemiSyncCommands, keyspaceName, true)
require.Nil(t, err)
}

Expand Down
28 changes: 28 additions & 0 deletions go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,34 @@ func (vttablet *VttabletProcess) QueryTablet(query string, keyspace string, useD
return executeQuery(conn, query)
}

// QueryTabletMultiple lets you execute multiple queries -- without any
// results -- against the tablet.
func (vttablet *VttabletProcess) QueryTabletMultiple(queries []string, keyspace string, useDb bool) error {
conn, err := vttablet.TabletConn(keyspace, useDb)
if err != nil {
return err
}
defer conn.Close()

for _, query := range queries {
log.Infof("Executing query %s (on %s)", query, vttablet.Name)
_, err := executeQuery(conn, query)
if err != nil {
return err
}
}
return nil
}

// TabletConn opens a MySQL connection on this tablet
func (vttablet *VttabletProcess) TabletConn(keyspace string, useDb bool) (*mysql.Conn, error) {
if !useDb {
keyspace = ""
}
dbParams := NewConnParams(vttablet.DbPort, vttablet.DbPassword, path.Join(vttablet.Directory, "mysql.sock"), keyspace)
return vttablet.conn(&dbParams)
}

func (vttablet *VttabletProcess) defaultConn(dbname string) (*mysql.Conn, error) {
dbParams := mysql.ConnParams{
Uname: "vt_dba",
Expand Down
48 changes: 45 additions & 3 deletions go/test/endtoend/clustertest/vtctld_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,51 @@ func testTabletStatus(t *testing.T) {
}

func testExecuteAsDba(t *testing.T) {
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDba", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, `SELECT 1 AS a`)
require.NoError(t, err)
assert.Equal(t, result, oneTableOutput)
tcases := []struct {
query string
result string
expectErr bool
}{
{
query: "",
expectErr: true,
},
{
query: "SELECT 1 AS a",
result: oneTableOutput,
},
{
query: "SELECT 1 AS a; SELECT 1 AS a",
expectErr: true,
},
{
query: "create table t(id int)",
result: "",
},
{
query: "create table if not exists t(id int)",
result: "",
},
{
query: "create table if not exists t(id int); create table if not exists t(id int);",
result: "",
},
{
query: "create table if not exists t(id int); create table if not exists t(id int); SELECT 1 AS a",
expectErr: true,
},
}
for _, tcase := range tcases {
t.Run(tcase.query, func(t *testing.T) {
result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("ExecuteFetchAsDba", clusterInstance.Keyspaces[0].Shards[0].Vttablets[0].Alias, tcase.query)
if tcase.expectErr {
assert.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, tcase.result, result)
}
})
}
}

func testExecuteAsApp(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/mysqlserver/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var (
PARTITION BY HASH( TO_DAYS(created) )
PARTITIONS 10;
`
createProcSQL = `use vt_test_keyspace;
createProcSQL = `
CREATE PROCEDURE testing()
BEGIN
delete from vt_insert_test;
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestMain(m *testing.M) {
}

primaryTabletProcess := clusterInstance.Keyspaces[0].Shards[0].PrimaryTablet().VttabletProcess
if _, err := primaryTabletProcess.QueryTablet(createProcSQL, keyspaceName, false); err != nil {
if _, err := primaryTabletProcess.QueryTablet(createProcSQL, keyspaceName, true); err != nil {
return 1, err
}

Expand Down
5 changes: 4 additions & 1 deletion go/test/endtoend/reparent/emergencyreparent/ers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,11 @@ func TestNoReplicationStatusAndIOThreadStopped(t *testing.T) {
tablets := clusterInstance.Keyspaces[0].Shards[0].Vttablets
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})

err := clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE; RESET SLAVE ALL`)
err := clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `STOP SLAVE`)
require.NoError(t, err)
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[1].Alias, `RESET SLAVE ALL`)
require.NoError(t, err)
//
err = clusterInstance.VtctlclientProcess.ExecuteCommand("ExecuteFetchAsDba", tablets[3].Alias, `STOP SLAVE IO_THREAD;`)
require.NoError(t, err)
// Run an additional command in the current primary which will only be acked by tablets[2] and be in its relay log.
Expand Down
29 changes: 18 additions & 11 deletions go/test/endtoend/reparent/plannedreparent/reparent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func TestPRSWithDrainedLaggingTablet(t *testing.T) {
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[1], tablets[2], tablets[3]})

// make tablets[1 lag from the other tablets by setting the delay to a large number
utils.RunSQL(context.Background(), t, `stop slave;CHANGE MASTER TO MASTER_DELAY = 1999;start slave;`, tablets[1])
utils.RunSQLs(context.Background(), t, []string{`stop slave`, `CHANGE MASTER TO MASTER_DELAY = 1999`, `start slave;`}, tablets[1])

// insert another row in tablets[1
utils.ConfirmReplication(t, tablets[0], []*cluster.Vttablet{tablets[2], tablets[3]})
Expand Down Expand Up @@ -224,26 +224,33 @@ func reparentFromOutside(t *testing.T, clusterInstance *cluster.LocalProcessClus
}

// commands to convert a replica to be writable
promoteReplicaCommands := "STOP SLAVE; RESET SLAVE ALL; SET GLOBAL read_only = OFF;"
utils.RunSQL(ctx, t, promoteReplicaCommands, tablets[1])
promoteReplicaCommands := []string{"STOP SLAVE", "RESET SLAVE ALL", "SET GLOBAL read_only = OFF"}
utils.RunSQLs(ctx, t, promoteReplicaCommands, tablets[1])

// Get primary position
_, gtID := cluster.GetPrimaryPosition(t, *tablets[1], utils.Hostname)

// tablets[0] will now be a replica of tablets[1
changeReplicationSourceCommands := fmt.Sprintf("RESET MASTER; RESET SLAVE; SET GLOBAL gtid_purged = '%s';"+
"CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1;"+
"START SLAVE;", gtID, utils.Hostname, tablets[1].MySQLPort)
utils.RunSQL(ctx, t, changeReplicationSourceCommands, tablets[0])
changeReplicationSourceCommands := []string{
"RESET MASTER",
"RESET SLAVE",
fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", gtID),
fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1", utils.Hostname, tablets[1].MySQLPort),
}
utils.RunSQLs(ctx, t, changeReplicationSourceCommands, tablets[0])

// Capture time when we made tablets[1 writable
baseTime := time.Now().UnixNano() / 1000000000

// tablets[2 will be a replica of tablets[1
changeReplicationSourceCommands = fmt.Sprintf("STOP SLAVE; RESET MASTER; SET GLOBAL gtid_purged = '%s';"+
"CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1;"+
"START SLAVE;", gtID, utils.Hostname, tablets[1].MySQLPort)
utils.RunSQL(ctx, t, changeReplicationSourceCommands, tablets[2])
changeReplicationSourceCommands = []string{
"STOP SLAVE",
"RESET MASTER",
fmt.Sprintf("SET GLOBAL gtid_purged = '%s'", gtID),
fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1", utils.Hostname, tablets[1].MySQLPort),
"START SLAVE",
}
utils.RunSQLs(ctx, t, changeReplicationSourceCommands, tablets[2])

// To test the downPrimary, we kill the old primary first and delete its tablet record
if downPrimary {
Expand Down
9 changes: 9 additions & 0 deletions go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,15 @@ func getMysqlConnParam(tablet *cluster.Vttablet) mysql.ConnParams {
return connParams
}

// RunSQLs is used to run SQL commands directly on the MySQL instance of a vttablet
func RunSQLs(ctx context.Context, t *testing.T, sqls []string, tablet *cluster.Vttablet) (results []*sqltypes.Result) {
for _, sql := range sqls {
result := RunSQL(ctx, t, sql, tablet)
results = append(results, result)
}
return results
}

// RunSQL is used to run a SQL command directly on the MySQL instance of a vttablet
func RunSQL(ctx context.Context, t *testing.T, sql string, tablet *cluster.Vttablet) *sqltypes.Result {
tabletParams := getMysqlConnParam(tablet)
Expand Down
21 changes: 14 additions & 7 deletions go/test/endtoend/vtorc/general/vtorc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,13 @@ func TestVTOrcRepairs(t *testing.T) {

t.Run("ReplicationFromOtherReplica", func(t *testing.T) {
// point replica at otherReplica
changeReplicationSourceCommand := fmt.Sprintf("STOP SLAVE; RESET SLAVE ALL;"+
"CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1; START SLAVE", utils.Hostname, otherReplica.MySQLPort)
_, err := utils.RunSQL(t, changeReplicationSourceCommand, replica, "")
changeReplicationSourceCommands := []string{
"STOP SLAVE",
"RESET SLAVE ALL",
fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1", utils.Hostname, otherReplica.MySQLPort),
"START SLAVE",
}
err := utils.RunSQLs(t, changeReplicationSourceCommands, replica, "")
require.NoError(t, err)

// wait until the source port is set back correctly by vtorc
Expand All @@ -199,10 +203,13 @@ func TestVTOrcRepairs(t *testing.T) {

t.Run("CircularReplication", func(t *testing.T) {
// change the replication source on the primary
changeReplicationSourceCommands := fmt.Sprintf("STOP SLAVE; RESET SLAVE ALL;"+
"CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1;"+
"START SLAVE;", replica.VttabletProcess.TabletHostname, replica.MySQLPort)
_, err := utils.RunSQL(t, changeReplicationSourceCommands, curPrimary, "")
changeReplicationSourceCommands := []string{
"STOP SLAVE",
"RESET SLAVE ALL",
fmt.Sprintf("CHANGE MASTER TO MASTER_HOST='%s', MASTER_PORT=%d, MASTER_USER='vt_repl', MASTER_AUTO_POSITION = 1", replica.VttabletProcess.TabletHostname, replica.MySQLPort),
"START SLAVE",
}
err := utils.RunSQLs(t, changeReplicationSourceCommands, curPrimary, "")
require.NoError(t, err)

// wait for curPrimary to reach stable state
Expand Down
21 changes: 20 additions & 1 deletion go/vt/sqlparser/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,26 @@ func TestSplitStatementToPieces(t *testing.T) {
"`createtime` datetime NOT NULL DEFAULT NOW() COMMENT 'create time;'," +
"`comment` varchar(100) NOT NULL DEFAULT '' COMMENT 'comment'," +
"PRIMARY KEY (`id`))",
}}
}, {
input: "create table t1 (id int primary key); create table t2 (id int primary key);",
output: "create table t1 (id int primary key); create table t2 (id int primary key)",
}, {
input: ";;; create table t1 (id int primary key);;; ;create table t2 (id int primary key);",
output: " create table t1 (id int primary key);create table t2 (id int primary key)",
}, {
// The input doesn't have to be valid SQL statements!
input: ";create table t1 ;create table t2 (id;",
output: "create table t1 ;create table t2 (id",
}, {
// Ignore quoted semicolon
input: ";create table t1 ';';;;create table t2 (id;",
output: "create table t1 ';';create table t2 (id",
}, {
// Ignore quoted semicolon
input: "stop replica; start replica",
output: "stop replica; start replica",
},
}

for _, tcase := range testcases {
t.Run(tcase.input, func(t *testing.T) {
Expand Down
20 changes: 20 additions & 0 deletions go/vt/sqlparser/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package sqlparser
import (
"fmt"
"sort"
"strings"

querypb "vitess.io/vitess/go/vt/proto/query"
)
Expand Down Expand Up @@ -160,3 +161,22 @@ func ReplaceTableQualifiers(query, olddb, newdb string) (string, error) {
}
return query, nil
}

// ReplaceTableQualifiersMultiQuery accepts a multi-query string and modifies it
// via ReplaceTableQualifiers, one query at a time.
func ReplaceTableQualifiersMultiQuery(multiQuery, olddb, newdb string) (string, error) {
queries, err := SplitStatementToPieces(multiQuery)
if err != nil {
return multiQuery, err
}
var modifiedQueries []string
for _, query := range queries {
// Replace any provided sidecar database qualifiers with the correct one.
query, err := ReplaceTableQualifiers(query, olddb, newdb)
if err != nil {
return query, err
}
modifiedQueries = append(modifiedQueries, query)
}
return strings.Join(modifiedQueries, ";"), nil
}
68 changes: 68 additions & 0 deletions go/vt/sqlparser/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,71 @@ func TestReplaceTableQualifiers(t *testing.T) {
})
}
}

func TestReplaceTableQualifiersMultiQuery(t *testing.T) {
origDB := "_vt"
tests := []struct {
name string
in string
newdb string
out string
wantErr bool
}{
{
name: "invalid select",
in: "select frog bar person",
out: "",
wantErr: true,
},
{
name: "simple select",
in: "select * from _vt.foo",
out: "select * from foo",
},
{
name: "simple select with new db",
in: "select * from _vt.foo",
newdb: "_vt_test",
out: "select * from _vt_test.foo",
},
{
name: "simple select with new db same",
in: "select * from _vt.foo where id=1", // should be unchanged
newdb: "_vt",
out: "select * from _vt.foo where id=1",
},
{
name: "simple select with new db needing escaping",
in: "select * from _vt.foo",
newdb: "1_vt-test",
out: "select * from `1_vt-test`.foo",
},
{
name: "multi query",
in: "select * from _vt.foo ; select * from _vt.bar",
out: "select * from foo;select * from bar",
},
{
name: "multi query with new db",
in: "select * from _vt.foo ; select * from _vt.bar",
newdb: "_vt_test",
out: "select * from _vt_test.foo;select * from _vt_test.bar",
},
{
name: "multi query with error",
in: "select * from _vt.foo ; select * from _vt.bar ; sel ect fr om wh at",
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := ReplaceTableQualifiersMultiQuery(tt.in, origDB, tt.newdb)
if tt.wantErr {
require.Error(t, err)
} else {
require.NoError(t, err)
}
require.Equal(t, tt.out, got, "RemoveTableQualifiers(); in: %s, out: %s", tt.in, got)
})
}
}
Loading

0 comments on commit 383b401

Please sign in to comment.