diff --git a/go/test/endtoend/transaction/twopc/stress/main_test.go b/go/test/endtoend/transaction/twopc/stress/main_test.go index 9c7ed28fa1a..ef8e454be15 100644 --- a/go/test/endtoend/transaction/twopc/stress/main_test.go +++ b/go/test/endtoend/transaction/twopc/stress/main_test.go @@ -71,6 +71,7 @@ func TestMain(m *testing.M) { clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, "--twopc_enable", "--twopc_abandon_age", "1", + "--migration_check_interval", "2s", ) // Start keyspace diff --git a/go/test/endtoend/transaction/twopc/stress/stress_test.go b/go/test/endtoend/transaction/twopc/stress/stress_test.go index 9912bdf6e19..82991e0c98b 100644 --- a/go/test/endtoend/transaction/twopc/stress/stress_test.go +++ b/go/test/endtoend/transaction/twopc/stress/stress_test.go @@ -34,9 +34,12 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/syscallutil" + "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/onlineddl" twopcutil "vitess.io/vitess/go/test/endtoend/transaction/twopc/utils" "vitess.io/vitess/go/test/endtoend/utils" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/schema" ) // TestDisruptions tests that atomic transactions persevere through various disruptions. @@ -44,12 +47,12 @@ func TestDisruptions(t *testing.T) { testcases := []struct { disruptionName string commitDelayTime string - disruption func() error + disruption func(t *testing.T) error }{ { disruptionName: "No Disruption", commitDelayTime: "1", - disruption: func() error { + disruption: func(t *testing.T) error { return nil }, }, @@ -68,6 +71,11 @@ func TestDisruptions(t *testing.T) { commitDelayTime: "5", disruption: vttabletRestartShard3, }, + { + disruptionName: "OnlineDDL", + commitDelayTime: "20", + disruption: onlineDDL, + }, { disruptionName: "EmergencyReparentShard", commitDelayTime: "5", @@ -119,7 +127,7 @@ func TestDisruptions(t *testing.T) { }() } // Run the disruption. - err := tt.disruption() + err := tt.disruption(t) require.NoError(t, err) // Wait for the commit to have returned. We don't actually check for an error in the commit because the user might receive an error. // But since we are waiting in CommitPrepared, the decision to commit the transaction should have already been taken. @@ -145,6 +153,7 @@ func threadToWrite(t *testing.T, ctx context.Context, id int) { continue } _, _ = utils.ExecAllowError(t, conn, fmt.Sprintf("insert into twopc_t1(id, col) values(%d, %d)", id, rand.Intn(10000))) + conn.Close() } } @@ -170,11 +179,13 @@ func waitForResults(t *testing.T, query string, resultExpected string, waitTime ctx := context.Background() conn, err := mysql.Connect(ctx, &vtParams) if err == nil { - res := utils.Exec(t, conn, query) + res, _ := utils.ExecAllowError(t, conn, query) conn.Close() - prevRes = res.Rows - if fmt.Sprintf("%v", res.Rows) == resultExpected { - return + if res != nil { + prevRes = res.Rows + if fmt.Sprintf("%v", res.Rows) == resultExpected { + return + } } } time.Sleep(100 * time.Millisecond) @@ -187,14 +198,14 @@ Cluster Level Disruptions for the fuzzer */ // prsShard3 runs a PRS in shard 3 of the keyspace. It promotes the second tablet to be the new primary. -func prsShard3() error { +func prsShard3(t *testing.T) error { shard := clusterInstance.Keyspaces[0].Shards[2] newPrimary := shard.Vttablets[1] return clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, shard.Name, newPrimary.Alias) } // ersShard3 runs a ERS in shard 3 of the keyspace. It promotes the second tablet to be the new primary. -func ersShard3() error { +func ersShard3(t *testing.T) error { shard := clusterInstance.Keyspaces[0].Shards[2] newPrimary := shard.Vttablets[1] _, err := clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput("EmergencyReparentShard", fmt.Sprintf("%s/%s", keyspaceName, shard.Name), "--new-primary", newPrimary.Alias) @@ -202,14 +213,14 @@ func ersShard3() error { } // vttabletRestartShard3 restarts the first vttablet of the third shard. -func vttabletRestartShard3() error { +func vttabletRestartShard3(t *testing.T) error { shard := clusterInstance.Keyspaces[0].Shards[2] tablet := shard.Vttablets[0] return tablet.RestartOnlyTablet() } // mysqlRestartShard3 restarts MySQL on the first tablet of the third shard. -func mysqlRestartShard3() error { +func mysqlRestartShard3(t *testing.T) error { shard := clusterInstance.Keyspaces[0].Shards[2] vttablets := shard.Vttablets tablet := vttablets[0] @@ -227,3 +238,72 @@ func mysqlRestartShard3() error { } return syscallutil.Kill(pid, syscall.SIGKILL) } + +var randomDDL = []string{ + "alter table twopc_t1 add column extra_col1 varchar(20)", + "alter table twopc_t1 add column extra_col2 varchar(20)", + "alter table twopc_t1 add column extra_col3 varchar(20)", + "alter table twopc_t1 add column extra_col4 varchar(20)", +} + +var count = 0 + +// onlineDDL runs a DDL statement. +func onlineDDL(t *testing.T) error { + output, err := clusterInstance.VtctldClientProcess.ApplySchemaWithOutput(keyspaceName, randomDDL[count], cluster.ApplySchemaParams{ + DDLStrategy: "vitess --force-cut-over-after=1ms", + }) + require.NoError(t, err) + count++ + fmt.Println("uuid: ", output) + status := WaitForMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), 2*time.Minute, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed) + onlineddl.CheckMigrationStatus(t, &vtParams, clusterInstance.Keyspaces[0].Shards, strings.TrimSpace(output), status) + require.Equal(t, schema.OnlineDDLStatusComplete, status) + return nil +} + +func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus { + shardNames := map[string]bool{} + for _, shard := range shards { + shardNames[shard.Name] = true + } + query := fmt.Sprintf("show vitess_migrations like '%s'", uuid) + + statusesMap := map[string]bool{} + for _, status := range expectStatuses { + statusesMap[string(status)] = true + } + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + lastKnownStatus := "" + for { + countMatchedShards := 0 + conn, err := mysql.Connect(ctx, vtParams) + if err != nil { + continue + } + r := utils.Exec(t, conn, query) + for _, row := range r.Named().Rows { + shardName := row["shard"].ToString() + if !shardNames[shardName] { + // irrelevant shard + continue + } + lastKnownStatus = row["migration_status"].ToString() + if row["migration_uuid"].ToString() == uuid && statusesMap[lastKnownStatus] { + countMatchedShards++ + } + } + if countMatchedShards == len(shards) { + return schema.OnlineDDLStatus(lastKnownStatus) + } + select { + case <-ctx.Done(): + return schema.OnlineDDLStatus(lastKnownStatus) + case <-ticker.C: + } + } +} diff --git a/go/test/endtoend/transaction/twopc/utils/utils.go b/go/test/endtoend/transaction/twopc/utils/utils.go index b3b8796accf..9d0ef838eb6 100644 --- a/go/test/endtoend/transaction/twopc/utils/utils.go +++ b/go/test/endtoend/transaction/twopc/utils/utils.go @@ -61,9 +61,9 @@ func ClearOutTable(t *testing.T, vtParams mysql.ConnParams, tableName string) { return } _, err = conn.ExecuteFetch(fmt.Sprintf("DELETE FROM %v LIMIT 10000", tableName), 10000, false) + conn.Close() if err != nil { fmt.Printf("Error in cleanup deletion - %v\n", err) - conn.Close() time.Sleep(100 * time.Millisecond) continue } diff --git a/go/vt/vttablet/tabletserver/dt_executor.go b/go/vt/vttablet/tabletserver/dt_executor.go index 395dd5e7a59..e89b40d9c61 100644 --- a/go/vt/vttablet/tabletserver/dt_executor.go +++ b/go/vt/vttablet/tabletserver/dt_executor.go @@ -88,7 +88,7 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error { // This could be due to ongoing cutover happening in vreplication workflow // regarding OnlineDDL or MoveTables. for _, query := range conn.TxProperties().Queries { - qr := dte.qe.queryRuleSources.FilterByPlan(query.Sql, query.PlanType, query.Tables...) + qr := dte.qe.queryRuleSources.FilterByPlan(query.Sql, 0, query.Tables...) if qr != nil { act, _, _, _ := qr.GetAction("", "", nil, sqlparser.MarginComments{}) if act != rules.QRContinue { @@ -108,7 +108,7 @@ func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error { // If they are put in the prepared pool, then vreplication workflow waits. // This check helps reject the prepare that came later. for _, query := range conn.TxProperties().Queries { - qr := dte.qe.queryRuleSources.FilterByPlan(query.Sql, query.PlanType, query.Tables...) + qr := dte.qe.queryRuleSources.FilterByPlan(query.Sql, 0, query.Tables...) if qr != nil { act, _, _, _ := qr.GetAction("", "", nil, sqlparser.MarginComments{}) if act != rules.QRContinue { diff --git a/go/vt/vttablet/tabletserver/dt_executor_test.go b/go/vt/vttablet/tabletserver/dt_executor_test.go index 95b071f6dbb..f6369cfb978 100644 --- a/go/vt/vttablet/tabletserver/dt_executor_test.go +++ b/go/vt/vttablet/tabletserver/dt_executor_test.go @@ -27,7 +27,6 @@ import ( "vitess.io/vitess/go/event/syslogger" "vitess.io/vitess/go/vt/vtenv" - "vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder" "vitess.io/vitess/go/vt/vttablet/tabletserver/rules" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tx" @@ -212,9 +211,8 @@ func TestExecutorPrepareRuleFailure(t *testing.T) { sc, err := tsv.te.txPool.GetAndLock(txid, "adding query property") require.NoError(t, err) sc.txProps.Queries = append(sc.txProps.Queries, tx.Query{ - Sql: "update test_table set col = 5", - PlanType: planbuilder.PlanUpdate, - Tables: []string{"test_table"}, + Sql: "update test_table set col = 5", + Tables: []string{"test_table"}, }) sc.Unlock() diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 70d9f67d0a2..e89dee889dc 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -806,7 +806,7 @@ func (qre *QueryExecutor) txFetch(conn *StatefulConnection, record bool) (*sqlty } // Only record successful queries. if record { - conn.TxProperties().RecordQuery(sql, qre.plan.PlanID, qre.plan.TableNames()) + conn.TxProperties().RecordQueryDetail(sql, qre.plan.TableNames()) } return qr, nil } diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index 7f863e26df7..e70b575f464 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -36,6 +36,7 @@ import ( "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/sidecardb" "vitess.io/vitess/go/vt/vtenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tx" "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/test/utils" @@ -241,7 +242,9 @@ func TestTabletServerRedoLogIsKeptBetweenRestarts(t *testing.T) { turnOnTxEngine() assert.EqualValues(t, 1, len(tsv.te.preparedPool.conns), "len(tsv.te.preparedPool.conns)") got := tsv.te.preparedPool.conns["dtid0"].TxProperties().Queries - want := []string{"update test_table set `name` = 2 where pk = 1 limit 10001"} + want := []tx.Query{{ + Sql: "update test_table set `name` = 2 where pk = 1 limit 10001", + Tables: []string{"test_table"}}} utils.MustMatch(t, want, got, "Prepared queries") turnOffTxEngine() assert.Empty(t, tsv.te.preparedPool.conns, "tsv.te.preparedPool.conns") @@ -275,7 +278,9 @@ func TestTabletServerRedoLogIsKeptBetweenRestarts(t *testing.T) { turnOnTxEngine() assert.EqualValues(t, 1, len(tsv.te.preparedPool.conns), "len(tsv.te.preparedPool.conns)") got = tsv.te.preparedPool.conns["a:b:10"].TxProperties().Queries - want = []string{"update test_table set `name` = 2 where pk = 1 limit 10001"} + want = []tx.Query{{ + Sql: "update test_table set `name` = 2 where pk = 1 limit 10001", + Tables: []string{"test_table"}}} utils.MustMatch(t, want, got, "Prepared queries") wantFailed := map[string]error{"a:b:20": errPrepFailed} utils.MustMatch(t, tsv.te.preparedPool.reserved, wantFailed, fmt.Sprintf("Failed dtids: %v, want %v", tsv.te.preparedPool.reserved, wantFailed)) diff --git a/go/vt/vttablet/tabletserver/tx/api.go b/go/vt/vttablet/tabletserver/tx/api.go index a52357c3cb8..48a1cc1107a 100644 --- a/go/vt/vttablet/tabletserver/tx/api.go +++ b/go/vt/vttablet/tabletserver/tx/api.go @@ -25,7 +25,6 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/sqlparser" - "vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder" ) type ( @@ -64,9 +63,8 @@ type ( ) type Query struct { - Sql string - PlanType planbuilder.PlanType - Tables []string + Sql string + Tables []string } const ( @@ -121,15 +119,32 @@ var txNames = map[ReleaseReason]string{ ConnRenewFail: "renewFail", } -// RecordQuery records the query against this transaction. -func (p *Properties) RecordQuery(query string, planType planbuilder.PlanType, tables []string) { +// RecordQueryDetail records the query and tables against this transaction. +func (p *Properties) RecordQueryDetail(query string, tables []string) { if p == nil { return } p.Queries = append(p.Queries, Query{ - Sql: query, - PlanType: planType, - Tables: tables, + Sql: query, + Tables: tables, + }) +} + +// RecordQuery records the query and extract tables against this transaction. +func (p *Properties) RecordQuery(query string, parser *sqlparser.Parser) { + if p == nil { + return + } + stmt, err := parser.Parse(query) + if err != nil { + // This should neven happen, but if it does, + // we would not be able to block cut-overs on this query. + return + } + tables := sqlparser.ExtractAllTables(stmt) + p.Queries = append(p.Queries, Query{ + Sql: query, + Tables: tables, }) } diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go index 8ad4c359456..3c90cfd6f74 100644 --- a/go/vt/vttablet/tabletserver/tx_engine.go +++ b/go/vt/vttablet/tabletserver/tx_engine.go @@ -427,6 +427,7 @@ outer: continue } for _, stmt := range preparedTx.Queries { + conn.TxProperties().RecordQuery(stmt, te.env.Environment().Parser()) _, err := conn.Exec(ctx, stmt, 1, false) if err != nil { allErr.RecordError(vterrors.Wrapf(err, "dtid - %v", preparedTx.Dtid))