Skip to content

Commit

Permalink
Use timeout specified for SwitchWrites while locking tables during th…
Browse files Browse the repository at this point in the history
…e SwitchWrites process. Add tests to ensure denied tables are cleaned up when migration is cancelled for partial and full movetables

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Sep 28, 2022
1 parent e2c451b commit 93843fd
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 6 deletions.
16 changes: 16 additions & 0 deletions go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,22 @@ func (vttablet *VttabletProcess) QueryTablet(query string, keyspace string, useD
return executeQuery(conn, query)
}

// LockTable locks the specified table and return the connection with the lock.
// Used for failing SwitchWrites which needs to lock tables to complete.
func (vttablet *VttabletProcess) LockTable(table, keyspace string, timeout time.Duration) (*mysql.Conn, error) {
query := fmt.Sprintf("lock tables %s write", table)
dbParams := NewConnParams(vttablet.DbPort, vttablet.DbPassword, path.Join(vttablet.Directory, "mysql.sock"), keyspace)
dbParams.ConnectTimeoutMs = uint64(timeout.Milliseconds())
conn, err := vttablet.conn(&dbParams)
if err != nil {
return nil, err
}
if _, err := executeQuery(conn, query); err != nil {
return nil, err
}
return conn, nil
}

func (vttablet *VttabletProcess) defaultConn(dbname string) (*mysql.Conn, error) {
dbParams := mysql.ConnParams{
Uname: "vt_dba",
Expand Down
26 changes: 22 additions & 4 deletions go/test/endtoend/vreplication/resharding_workflows_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strconv"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -259,6 +260,8 @@ func TestBasicV2Workflows(t *testing.T) {
log.Flush()
}

const lockHoldTimeout = 5 * time.Minute

// TestPartialMoveTables tests partial move tables by moving just one shard
// 80- from customer to customer2.
func TestPartialMoveTables(t *testing.T) {
Expand Down Expand Up @@ -302,6 +305,7 @@ func TestPartialMoveTables(t *testing.T) {
"customer", workflowActionCreate, "", shard, "")
require.NoError(t, err)
targetTab1 = vc.getPrimaryTablet(t, moveToKs, shard)
sourceTab1 := vc.getPrimaryTablet(t, "customer", shard)
catchup(t, targetTab1, wfName, "Partial MoveTables Customer to Customer2")
vdiff1(t, ksWf, "")

Expand All @@ -315,15 +319,29 @@ func TestPartialMoveTables(t *testing.T) {
applyShardRoutingRules(t, emptyRules)
require.Equal(t, emptyRules, getShardRoutingRules(t))

// switch all traffic
require.NoError(t, tstWorkflowExec(t, "", wfName, "", moveToKs, "", workflowActionSwitchTraffic, "", "", ""))
lockConn, err := sourceTab1.LockTable("customer", "customer", lockHoldTimeout)
require.NoError(t, err)
require.NotNil(t, lockConn)

moveTablesTimeout := "60s"

// switch all traffic, should fail since we hold a table lock
switchTrafficArgs := []string{"MoveTables", "--", "--timeout", moveTablesTimeout, "--max_replication_lag_allowed=2542087h", "SwitchTraffic", "customer2.partial"}
output, err := vc.VtctlClient.ExecuteCommandWithOutput(switchTrafficArgs...)
require.Errorf(t, err, fmt.Sprintf("%s: %s: %s", time.Now().String(), err, output))
require.Equal(t, emptyRules, getShardRoutingRules(t))

// switch all traffic, should succeed
lockConn.Close()
output, err = vc.VtctlClient.ExecuteCommandWithOutput(switchTrafficArgs...)
require.NoError(t, err, fmt.Sprintf("%s: %s: %s", time.Now().String(), err, output))
expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow customer2.partial\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n",
shard, shard)
require.Equal(t, expectedSwitchOutput, lastOutput)
require.Equal(t, expectedSwitchOutput, output)

// Confirm global routing rules -- everything should still be routed
// to the source side, customer, globally.
output, err := vc.VtctlClient.ExecuteCommandWithOutput("GetRoutingRules")
output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetRoutingRules")
require.NoError(t, err)
result := gjson.Get(output, "rules")
result.ForEach(func(attributeKey, attributeValue gjson.Result) bool {
Expand Down
13 changes: 12 additions & 1 deletion go/test/endtoend/vreplication/time_zone_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,18 @@ func TestMoveTablesTZ(t *testing.T) {
require.Equal(t, row.AsString("dt2", ""), qrTargetUSPacific.Named().Rows[i].AsString("dt2", ""))
require.Equal(t, row.AsString("ts1", ""), qrTargetUSPacific.Named().Rows[i].AsString("ts1", ""))
}
output, err = vc.VtctlClient.ExecuteCommandWithOutput("MoveTables", "--", "SwitchTraffic", ksWorkflow)

lockConn, err := productTab.LockTable("datze", sourceKs, lockHoldTimeout)
require.NoError(t, err)
require.NotNil(t, lockConn)

moveTablesTimeout := "60s"
switchTrafficArgs := []string{"MoveTables", "--", "--timeout", moveTablesTimeout, "SwitchTraffic", ksWorkflow}
output, err = vc.VtctlClient.ExecuteCommandWithOutput(switchTrafficArgs...)
require.Error(t, err, output)

lockConn.Close()
output, err = vc.VtctlClient.ExecuteCommandWithOutput(switchTrafficArgs...)
require.NoError(t, err, output)

qr, err := productTab.QueryTablet(fmt.Sprintf("select * from _vt.vreplication where workflow='%s_reverse'", workflow), "", false)
Expand Down
7 changes: 6 additions & 1 deletion go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,8 +549,12 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowNa
ts.Logger().Infof("Executing LOCK TABLES on source tables %d times", lockTablesCycles)
// Doing this twice with a pause in-between to catch any writes that may have raced in between
// the tablet's deny list check and the first mysqld side table lock.

// while locking tables wait only till the timeout provided
shortCtx, shortCtxCancel := context.WithTimeout(ctx, timeout)
defer shortCtxCancel()
for cnt := 1; cnt <= lockTablesCycles; cnt++ {
if err := ts.executeLockTablesOnSource(ctx); err != nil {
if err := ts.executeLockTablesOnSource(shortCtx); err != nil {
ts.Logger().Errorf("Failed to execute LOCK TABLES (attempt %d of %d) on sources: %v", cnt, lockTablesCycles, err)
sw.cancelMigration(ctx, sm)
return 0, nil, err
Expand Down Expand Up @@ -1141,6 +1145,7 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error {

func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access accessType) error {
return ts.ForAllSources(func(source *workflow.MigrationSource) error {
ts.Logger().Infof("UpdateSourceDeniedTables for %s, allowWrites %t", source.GetPrimary().AliasString(), access == allowWrites)
if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error {
return si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, access == allowWrites /* remove */, ts.Tables())
}); err != nil {
Expand Down

0 comments on commit 93843fd

Please sign in to comment.