From 93843fd1bc0fa8cf05cce86a33a8538909511b8d Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Wed, 28 Sep 2022 22:18:39 +0200 Subject: [PATCH] Use timeout specified for SwitchWrites while locking tables during the SwitchWrites process. Add tests to ensure denied tables are cleaned up when migration is cancelled for partial and full movetables Signed-off-by: Rohit Nayak --- go/test/endtoend/cluster/vttablet_process.go | 16 ++++++++++++ .../resharding_workflows_v2_test.go | 26 ++++++++++++++++--- .../endtoend/vreplication/time_zone_test.go | 13 +++++++++- go/vt/wrangler/traffic_switcher.go | 7 ++++- 4 files changed, 56 insertions(+), 6 deletions(-) diff --git a/go/test/endtoend/cluster/vttablet_process.go b/go/test/endtoend/cluster/vttablet_process.go index 46f55f579e3..492754c8d1f 100644 --- a/go/test/endtoend/cluster/vttablet_process.go +++ b/go/test/endtoend/cluster/vttablet_process.go @@ -391,6 +391,22 @@ func (vttablet *VttabletProcess) QueryTablet(query string, keyspace string, useD return executeQuery(conn, query) } +// LockTable locks the specified table and return the connection with the lock. +// Used for failing SwitchWrites which needs to lock tables to complete. +func (vttablet *VttabletProcess) LockTable(table, keyspace string, timeout time.Duration) (*mysql.Conn, error) { + query := fmt.Sprintf("lock tables %s write", table) + dbParams := NewConnParams(vttablet.DbPort, vttablet.DbPassword, path.Join(vttablet.Directory, "mysql.sock"), keyspace) + dbParams.ConnectTimeoutMs = uint64(timeout.Milliseconds()) + conn, err := vttablet.conn(&dbParams) + if err != nil { + return nil, err + } + if _, err := executeQuery(conn, query); err != nil { + return nil, err + } + return conn, nil +} + func (vttablet *VttabletProcess) defaultConn(dbname string) (*mysql.Conn, error) { dbParams := mysql.ConnParams{ Uname: "vt_dba", diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index 7b085a9321b..18d770aaeb8 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -23,6 +23,7 @@ import ( "strconv" "strings" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -259,6 +260,8 @@ func TestBasicV2Workflows(t *testing.T) { log.Flush() } +const lockHoldTimeout = 5 * time.Minute + // TestPartialMoveTables tests partial move tables by moving just one shard // 80- from customer to customer2. func TestPartialMoveTables(t *testing.T) { @@ -302,6 +305,7 @@ func TestPartialMoveTables(t *testing.T) { "customer", workflowActionCreate, "", shard, "") require.NoError(t, err) targetTab1 = vc.getPrimaryTablet(t, moveToKs, shard) + sourceTab1 := vc.getPrimaryTablet(t, "customer", shard) catchup(t, targetTab1, wfName, "Partial MoveTables Customer to Customer2") vdiff1(t, ksWf, "") @@ -315,15 +319,29 @@ func TestPartialMoveTables(t *testing.T) { applyShardRoutingRules(t, emptyRules) require.Equal(t, emptyRules, getShardRoutingRules(t)) - // switch all traffic - require.NoError(t, tstWorkflowExec(t, "", wfName, "", moveToKs, "", workflowActionSwitchTraffic, "", "", "")) + lockConn, err := sourceTab1.LockTable("customer", "customer", lockHoldTimeout) + require.NoError(t, err) + require.NotNil(t, lockConn) + + moveTablesTimeout := "60s" + + // switch all traffic, should fail since we hold a table lock + switchTrafficArgs := []string{"MoveTables", "--", "--timeout", moveTablesTimeout, "--max_replication_lag_allowed=2542087h", "SwitchTraffic", "customer2.partial"} + output, err := vc.VtctlClient.ExecuteCommandWithOutput(switchTrafficArgs...) + require.Errorf(t, err, fmt.Sprintf("%s: %s: %s", time.Now().String(), err, output)) + require.Equal(t, emptyRules, getShardRoutingRules(t)) + + // switch all traffic, should succeed + lockConn.Close() + output, err = vc.VtctlClient.ExecuteCommandWithOutput(switchTrafficArgs...) + require.NoError(t, err, fmt.Sprintf("%s: %s: %s", time.Now().String(), err, output)) expectedSwitchOutput := fmt.Sprintf("SwitchTraffic was successful for workflow customer2.partial\nStart State: Reads Not Switched. Writes Not Switched\nCurrent State: Reads partially switched, for shards: %s. Writes partially switched, for shards: %s\n\n", shard, shard) - require.Equal(t, expectedSwitchOutput, lastOutput) + require.Equal(t, expectedSwitchOutput, output) // Confirm global routing rules -- everything should still be routed // to the source side, customer, globally. - output, err := vc.VtctlClient.ExecuteCommandWithOutput("GetRoutingRules") + output, err = vc.VtctlClient.ExecuteCommandWithOutput("GetRoutingRules") require.NoError(t, err) result := gjson.Get(output, "rules") result.ForEach(func(attributeKey, attributeValue gjson.Result) bool { diff --git a/go/test/endtoend/vreplication/time_zone_test.go b/go/test/endtoend/vreplication/time_zone_test.go index b10cd55e048..c19a0e87947 100644 --- a/go/test/endtoend/vreplication/time_zone_test.go +++ b/go/test/endtoend/vreplication/time_zone_test.go @@ -187,7 +187,18 @@ func TestMoveTablesTZ(t *testing.T) { require.Equal(t, row.AsString("dt2", ""), qrTargetUSPacific.Named().Rows[i].AsString("dt2", "")) require.Equal(t, row.AsString("ts1", ""), qrTargetUSPacific.Named().Rows[i].AsString("ts1", "")) } - output, err = vc.VtctlClient.ExecuteCommandWithOutput("MoveTables", "--", "SwitchTraffic", ksWorkflow) + + lockConn, err := productTab.LockTable("datze", sourceKs, lockHoldTimeout) + require.NoError(t, err) + require.NotNil(t, lockConn) + + moveTablesTimeout := "60s" + switchTrafficArgs := []string{"MoveTables", "--", "--timeout", moveTablesTimeout, "SwitchTraffic", ksWorkflow} + output, err = vc.VtctlClient.ExecuteCommandWithOutput(switchTrafficArgs...) + require.Error(t, err, output) + + lockConn.Close() + output, err = vc.VtctlClient.ExecuteCommandWithOutput(switchTrafficArgs...) require.NoError(t, err, output) qr, err := productTab.QueryTablet(fmt.Sprintf("select * from _vt.vreplication where workflow='%s_reverse'", workflow), "", false) diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 9a07150c117..73415becb67 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -549,8 +549,12 @@ func (wr *Wrangler) SwitchWrites(ctx context.Context, targetKeyspace, workflowNa ts.Logger().Infof("Executing LOCK TABLES on source tables %d times", lockTablesCycles) // Doing this twice with a pause in-between to catch any writes that may have raced in between // the tablet's deny list check and the first mysqld side table lock. + + // while locking tables wait only till the timeout provided + shortCtx, shortCtxCancel := context.WithTimeout(ctx, timeout) + defer shortCtxCancel() for cnt := 1; cnt <= lockTablesCycles; cnt++ { - if err := ts.executeLockTablesOnSource(ctx); err != nil { + if err := ts.executeLockTablesOnSource(shortCtx); err != nil { ts.Logger().Errorf("Failed to execute LOCK TABLES (attempt %d of %d) on sources: %v", cnt, lockTablesCycles, err) sw.cancelMigration(ctx, sm) return 0, nil, err @@ -1141,6 +1145,7 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access accessType) error { return ts.ForAllSources(func(source *workflow.MigrationSource) error { + ts.Logger().Infof("UpdateSourceDeniedTables for %s, allowWrites %t", source.GetPrimary().AliasString(), access == allowWrites) if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { return si.UpdateSourceDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, access == allowWrites /* remove */, ts.Tables()) }); err != nil {