From 3b09eb25e89cfb5735e64e0e26ddbbabc8fd7ad6 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 20 May 2024 17:14:56 -0400 Subject: [PATCH] SchemaEngine: Ensure GetTableForPos returns table schema for "current" position by default (#15912) Signed-off-by: Matt Lord --- go/mysql/fakesqldb/server.go | 23 ++- go/test/endtoend/vreplication/cluster_test.go | 3 +- .../resharding_workflows_v2_test.go | 6 +- .../tabletmanager/vreplication/controller.go | 8 +- .../vreplication/framework_test.go | 3 - .../vreplication/journal_test.go | 7 - .../vreplication/vcopier_test.go | 14 -- .../vreplication/vplayer_flaky_test.go | 37 ---- go/vt/vttablet/tabletserver/schema/engine.go | 82 ++++++-- .../tabletserver/schema/engine_test.go | 194 +++++++++++++++++- .../tabletserver/schema/historian_test.go | 37 ++-- .../vstreamer/helper_event_test.go | 17 -- .../vstreamer/resultstreamer_test.go | 1 - .../tabletserver/vstreamer/rowstreamer.go | 18 +- .../vstreamer/rowstreamer_test.go | 25 +-- .../vstreamer/tablestreamer_test.go | 2 - .../vstreamer/uvstreamer_flaky_test.go | 1 - .../tabletserver/vstreamer/vstreamer.go | 2 +- .../tabletserver/vstreamer/vstreamer_test.go | 19 +- 19 files changed, 316 insertions(+), 183 deletions(-) diff --git a/go/mysql/fakesqldb/server.go b/go/mysql/fakesqldb/server.go index d4ad5ad9dac..33512f23514 100644 --- a/go/mysql/fakesqldb/server.go +++ b/go/mysql/fakesqldb/server.go @@ -33,12 +33,16 @@ import ( "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/log" - querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vtenv" + + querypb "vitess.io/vitess/go/vt/proto/query" ) -const appendEntry = -1 +const ( + appendEntry = -1 + useQuery = "use `fakesqldb`" +) // DB is a fake database and all its methods are thread safe. It // creates a mysql.Listener and implements the mysql.Handler @@ -200,7 +204,7 @@ func New(t testing.TB) *DB { db.listener.Accept() }() - db.AddQuery("use `fakesqldb`", &sqltypes.Result{}) + db.AddQuery(useQuery, &sqltypes.Result{}) // Return the db. return db } @@ -598,6 +602,8 @@ func (db *DB) RejectQueryPattern(queryPattern, error string) { // ClearQueryPattern removes all query patterns set up func (db *DB) ClearQueryPattern() { + db.mu.Lock() + defer db.mu.Unlock() db.patternData = make(map[string]exprResult) } @@ -617,6 +623,17 @@ func (db *DB) DeleteQuery(query string) { delete(db.queryCalled, key) } +// DeleteAllQueries deletes all expected queries from the fake DB. +func (db *DB) DeleteAllQueries() { + db.mu.Lock() + defer db.mu.Unlock() + clear(db.data) + clear(db.patternData) + clear(db.queryCalled) + // Use is always expected to be present. + db.data[useQuery] = &ExpectedResult{&sqltypes.Result{}, nil} +} + // AddRejectedQuery adds a query which will be rejected at execution time. func (db *DB) AddRejectedQuery(query string, err error) { db.mu.Lock() diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go index ce8c0bc916c..ddd323f7d3f 100644 --- a/go/test/endtoend/vreplication/cluster_test.go +++ b/go/test/endtoend/vreplication/cluster_test.go @@ -519,10 +519,9 @@ func (vc *VitessCluster) AddTablet(t testing.TB, cell *Cell, keyspace *Keyspace, tablet := &Tablet{} options := []string{ - "--queryserver-config-schema-reload-time", "5s", "--heartbeat_on_demand_duration", "5s", "--heartbeat_interval", "250ms", - } // FIXME: for multi-cell initial schema doesn't seem to load without "--queryserver-config-schema-reload-time" + } options = append(options, extraVTTabletArgs...) if mainClusterConfig.vreplicationCompressGTID { diff --git a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go index 34a4b269b53..82c859acb40 100644 --- a/go/test/endtoend/vreplication/resharding_workflows_v2_test.go +++ b/go/test/endtoend/vreplication/resharding_workflows_v2_test.go @@ -505,8 +505,10 @@ func testVSchemaForSequenceAfterMoveTables(t *testing.T) { execVtgateQuery(t, vtgateConn, "product", "insert into customer2(name) values('a')") } waitForRowCount(t, vtgateConn, "product", "customer2", 3+num+num) - want = fmt.Sprintf("[[INT32(%d)]]", 100+num+num-1) - waitForQueryResult(t, vtgateConn, "product", "select max(cid) from customer2", want) + res := execVtgateQuery(t, vtgateConn, "product", "select max(cid) from customer2") + cid, err := res.Rows[0][0].ToInt() + require.NoError(t, err) + require.GreaterOrEqual(t, cid, 100+num+num-1) } // testReplicatingWithPKEnumCols ensures that we properly apply binlog events diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index a5c7c2a95d4..581244eebb3 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -24,18 +24,16 @@ import ( "sync/atomic" "time" - "vitess.io/vitess/go/vt/vttablet" - "google.golang.org/protobuf/encoding/prototext" - "vitess.io/vitess/go/vt/discovery" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/tb" "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index f4c3525e4d7..04c4c8f3e41 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -256,7 +256,6 @@ func addTablet(id int) *topodatapb.Tablet { if err := env.TopoServ.CreateTablet(context.Background(), tablet); err != nil { panic(err) } - env.SchemaEngine.Reload(context.Background()) return tablet } @@ -277,7 +276,6 @@ func addOtherTablet(id int, keyspace, shard string) *topodatapb.Tablet { if err := env.TopoServ.CreateTablet(context.Background(), tablet); err != nil { panic(err) } - env.SchemaEngine.Reload(context.Background()) return tablet } @@ -285,7 +283,6 @@ func deleteTablet(tablet *topodatapb.Tablet) { env.TopoServ.DeleteTablet(context.Background(), tablet.Alias) // This is not automatically removed from shard replication, which results in log spam. topo.DeleteTabletReplicationData(context.Background(), env.TopoServ, tablet) - env.SchemaEngine.Reload(context.Background()) } // fakeTabletConn implement TabletConn interface. We only care about the diff --git a/go/vt/vttablet/tabletmanager/vreplication/journal_test.go b/go/vt/vttablet/tabletmanager/vreplication/journal_test.go index b9dc716d56f..18dbe1e7fd8 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/journal_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/journal_test.go @@ -20,8 +20,6 @@ import ( "fmt" "testing" - "context" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" qh "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication/queryhistory" ) @@ -38,7 +36,6 @@ func TestJournalOneToOne(t *testing.T) { "drop table t", fmt.Sprintf("drop table %s.t", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -98,7 +95,6 @@ func TestJournalOneToMany(t *testing.T) { "drop table t", fmt.Sprintf("drop table %s.t", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -164,7 +160,6 @@ func TestJournalTablePresent(t *testing.T) { "drop table t", fmt.Sprintf("drop table %s.t", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -223,7 +218,6 @@ func TestJournalTableNotPresent(t *testing.T) { "drop table t", fmt.Sprintf("drop table %s.t", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -278,7 +272,6 @@ func TestJournalTableMixed(t *testing.T) { fmt.Sprintf("drop table %s.t", vrepldb), fmt.Sprintf("drop table %s.t1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go index 2e7c539e840..8f23f28c87d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_test.go @@ -116,7 +116,6 @@ func testPlayerCopyCharPK(t *testing.T) { "drop table src", fmt.Sprintf("drop table %s.dst", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) count := 0 vstreamRowsSendHook = func(ctx context.Context) { @@ -223,7 +222,6 @@ func testPlayerCopyVarcharPKCaseInsensitive(t *testing.T) { "drop table src", fmt.Sprintf("drop table %s.dst", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) count := 0 vstreamRowsSendHook = func(ctx context.Context) { @@ -453,7 +451,6 @@ func testPlayerCopyTablesWithFK(t *testing.T) { "drop table src2", fmt.Sprintf("drop table %s.dst2", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -581,7 +578,6 @@ func testPlayerCopyTables(t *testing.T) { fmt.Sprintf("drop table %s.yes", vrepldb), "drop table no", }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -708,7 +704,6 @@ func testPlayerCopyBigTable(t *testing.T) { "drop table src", fmt.Sprintf("drop table %s.dst", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) count := 0 vstreamRowsSendHook = func(ctx context.Context) { @@ -839,7 +834,6 @@ func testPlayerCopyWildcardRule(t *testing.T) { "drop table src", fmt.Sprintf("drop table %s.src", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) count := 0 vstreamRowsSendHook = func(ctx context.Context) { @@ -968,7 +962,6 @@ func testPlayerCopyTableContinuation(t *testing.T) { "drop table src1", fmt.Sprintf("drop table %s.dst1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -1135,7 +1128,6 @@ func testPlayerCopyWildcardTableContinuation(t *testing.T) { "drop table src", fmt.Sprintf("drop table %s.dst", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -1232,7 +1224,6 @@ func TestPlayerCopyWildcardTableContinuationWithOptimizeInserts(t *testing.T) { "drop table src", fmt.Sprintf("drop table %s.dst", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -1358,7 +1349,6 @@ func testPlayerCopyTablesStopAfterCopy(t *testing.T) { "drop table src1", fmt.Sprintf("drop table %s.dst1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -1444,7 +1434,6 @@ func testPlayerCopyTablesGIPK(t *testing.T) { "drop table src2", fmt.Sprintf("drop table %s.dst2", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -1535,7 +1524,6 @@ func testPlayerCopyTableCancel(t *testing.T) { "drop table src1", fmt.Sprintf("drop table %s.dst1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) saveTimeout := vttablet.CopyPhaseDuration vttablet.CopyPhaseDuration = 1 * time.Millisecond @@ -1626,7 +1614,6 @@ func testPlayerCopyTablesWithGeneratedColumn(t *testing.T) { "drop table src2", fmt.Sprintf("drop table %s.dst2", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -1714,7 +1701,6 @@ func testCopyTablesWithInvalidDates(t *testing.T) { "drop table src1", fmt.Sprintf("drop table %s.dst1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index 5beb85e8450..f79f7a42744 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -66,7 +66,6 @@ func TestPlayerGeneratedInvisiblePrimaryKey(t *testing.T) { "drop table t2", fmt.Sprintf("drop table %s.t2", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -144,7 +143,6 @@ func TestPlayerInvisibleColumns(t *testing.T) { "drop table t1", fmt.Sprintf("drop table %s.t1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -241,7 +239,6 @@ func TestVReplicationTimeUpdated(t *testing.T) { "drop table t1", fmt.Sprintf("drop table %s.t1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -308,7 +305,6 @@ func TestCharPK(t *testing.T) { "drop table t4", fmt.Sprintf("drop table %s.t4", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -423,7 +419,6 @@ func TestRollup(t *testing.T) { "drop table t1", fmt.Sprintf("drop table %s.t1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -480,7 +475,6 @@ func TestPlayerSavepoint(t *testing.T) { "drop table t1", fmt.Sprintf("drop table %s.t1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -548,8 +542,6 @@ func TestPlayerForeignKeyCheck(t *testing.T) { fmt.Sprintf("drop table %s.parent", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) - filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ Match: "/.*", @@ -593,7 +585,6 @@ func TestPlayerStatementModeWithFilter(t *testing.T) { defer execStatements(t, []string{ "drop table src1", }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -638,7 +629,6 @@ func TestPlayerStatementMode(t *testing.T) { "drop table src1", fmt.Sprintf("drop table %s.src1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -713,7 +703,6 @@ func TestPlayerFilters(t *testing.T) { "drop table src_charset", fmt.Sprintf("drop table %s.dst_charset", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -1030,7 +1019,6 @@ func TestPlayerKeywordNames(t *testing.T) { "drop table `commit`", fmt.Sprintf("drop table %s.`commit`", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -1216,7 +1204,6 @@ func TestPlayerKeyspaceID(t *testing.T) { "drop table src1", fmt.Sprintf("drop table %s.dst1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) if err := env.SetVSchema(shardedVSchema); err != nil { t.Fatal(err) @@ -1278,7 +1265,6 @@ func TestUnicode(t *testing.T) { "drop table src1", fmt.Sprintf("drop table %s.dst1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -1351,7 +1337,6 @@ func TestPlayerUpdates(t *testing.T) { "drop table t1", fmt.Sprintf("drop table %s.t1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -1467,7 +1452,6 @@ func TestPlayerRowMove(t *testing.T) { "drop table src", fmt.Sprintf("drop table %s.dst", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -1561,8 +1545,6 @@ func TestPlayerTypes(t *testing.T) { fmt.Sprintf("drop table %s.vitess_json", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) - filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ Match: "/.*", @@ -1689,7 +1671,6 @@ func TestPlayerDDL(t *testing.T) { "drop table t1", fmt.Sprintf("drop table %s.t1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -1880,7 +1861,6 @@ func TestPlayerStopPos(t *testing.T) { fmt.Sprintf("drop table %s.yes", vrepldb), "drop table no", }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -1980,7 +1960,6 @@ func TestPlayerStopAtOther(t *testing.T) { "drop table t1", fmt.Sprintf("drop table %s.t1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) // Insert a source row. execStatements(t, []string{ @@ -2090,7 +2069,6 @@ func TestPlayerIdleUpdate(t *testing.T) { "drop table t1", fmt.Sprintf("drop table %s.t1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -2144,7 +2122,6 @@ func TestPlayerSplitTransaction(t *testing.T) { "drop table t1", fmt.Sprintf("drop table %s.t1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -2188,7 +2165,6 @@ func TestPlayerLockErrors(t *testing.T) { "drop table t1", fmt.Sprintf("drop table %s.t1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -2268,7 +2244,6 @@ func TestPlayerCancelOnLock(t *testing.T) { "drop table t1", fmt.Sprintf("drop table %s.t1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -2346,7 +2321,6 @@ func TestPlayerTransactions(t *testing.T) { "drop table t1", fmt.Sprintf("drop table %s.t1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -2452,7 +2426,6 @@ func TestPlayerRelayLogMaxSize(t *testing.T) { "drop table t1", fmt.Sprintf("drop table %s.t1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -2547,7 +2520,6 @@ func TestRestartOnVStreamEnd(t *testing.T) { "drop table t1", fmt.Sprintf("drop table %s.t1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -2602,7 +2574,6 @@ func TestTimestamp(t *testing.T) { "drop table t1", fmt.Sprintf("drop table %s.t1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -2655,8 +2626,6 @@ func TestPlayerJSONDocs(t *testing.T) { fmt.Sprintf("drop table %s.vitess_json", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) - filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ Match: "/.*", @@ -2730,8 +2699,6 @@ func TestPlayerJSONTwoColumns(t *testing.T) { fmt.Sprintf("drop table %s.vitess_json2", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) - filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ Match: "/.*", @@ -2823,7 +2790,6 @@ func TestGeneratedColumns(t *testing.T) { "drop table t2", fmt.Sprintf("drop table %s.t2", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -2900,7 +2866,6 @@ func TestPlayerInvalidDates(t *testing.T) { }) pos := primaryPosition(t) execStatements(t, []string{"set sql_mode=''", "insert into src1 values(1, '0000-00-00')", "set sql_mode='STRICT_TRANS_TABLES'"}) - env.SchemaEngine.Reload(context.Background()) // default mysql flavor allows invalid dates: so disallow explicitly for this test if err := env.Mysqld.ExecuteSuperQuery(context.Background(), "SET @@global.sql_mode=REPLACE(REPLACE(@@session.sql_mode, 'NO_ZERO_DATE', ''), 'NO_ZERO_IN_DATE', '')"); err != nil { @@ -2986,7 +2951,6 @@ func TestPlayerNoBlob(t *testing.T) { "drop table t2", fmt.Sprintf("drop table %s.t2", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -3124,7 +3088,6 @@ func TestPlayerBatchMode(t *testing.T) { "drop table t1", fmt.Sprintf("drop table %s.t1", vrepldb), }) - env.SchemaEngine.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index 0bf2ebefd54..ddc1b376628 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -28,13 +28,11 @@ import ( "golang.org/x/exp/maps" + "vitess.io/vitess/go/acl" "vitess.io/vitess/go/constants/sidecar" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/mysql/sqlerror" - "vitess.io/vitess/go/vt/vtenv" - - "vitess.io/vitess/go/acl" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/timer" @@ -47,6 +45,7 @@ import ( "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/sidecardb" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" @@ -532,7 +531,7 @@ func (se *Engine) reload(ctx context.Context, includeStats bool) error { dropped := se.getDroppedTables(curTables, changedViews, mismatchTables) - // Populate PKColumns for changed tables. + // Populate PK Columns for changed tables. if err := se.populatePrimaryKeys(ctx, conn.Conn, changedTables); err != nil { return err } @@ -670,8 +669,14 @@ func (se *Engine) RegisterVersionEvent() error { return se.historian.RegisterVersionEvent() } -// GetTableForPos returns a best-effort schema for a specific gtid -func (se *Engine) GetTableForPos(tableName sqlparser.IdentifierCS, gtid string) (*binlogdatapb.MinimalTable, error) { +// GetTableForPos makes a best-effort attempt to return a table's schema at a specific +// GTID/position. If it cannot get the table schema for the given GTID/position then it +// returns the latest table schema that is available in the database -- the table schema +// for the "current" GTID/position (updating the cache entry). If the table is not found +// in the cache, it will reload the cache from the database in case the table was created +// after the last schema reload or the cache has not yet been initialized. This function +// makes the schema cache a read-through cache for VReplication purposes. +func (se *Engine) GetTableForPos(ctx context.Context, tableName sqlparser.IdentifierCS, gtid string) (*binlogdatapb.MinimalTable, error) { mt, err := se.historian.GetTableForPos(tableName, gtid) if err != nil { log.Infof("GetTableForPos returned error: %s", err.Error()) @@ -680,19 +685,66 @@ func (se *Engine) GetTableForPos(tableName sqlparser.IdentifierCS, gtid string) if mt != nil { return mt, nil } + // We got nothing from the historian, which typically means that it's not enabled. se.mu.Lock() defer se.mu.Unlock() tableNameStr := tableName.String() - st, ok := se.tables[tableNameStr] - if !ok { - if schema.IsInternalOperationTableName(tableNameStr) { - log.Infof("internal table %v found in vttablet schema: skipping for GTID search", tableNameStr) - } else { - log.Infof("table %v not found in vttablet schema, current tables: %v", tableNameStr, se.tables) - return nil, fmt.Errorf("table %v not found in vttablet schema", tableNameStr) + if st, ok := se.tables[tableNameStr]; ok && tableNameStr != "dual" { // No need to refresh dual + // Test Engines (NewEngineForTests()) don't have a conns pool and are not + // supposed to talk to the database, so don't update the cache entry in that + // case. + if se.conns == nil { + return newMinimalTable(st), nil + } + // We have the table in our cache. Let's be sure that our table definition is + // up-to-date for the "current" position. + conn, err := se.conns.Get(ctx, nil) + if err != nil { + return nil, err + } + defer conn.Recycle() + cst := *st // Make a copy + cst.Fields = nil // We're going to refresh the columns/fields + if err := fetchColumns(&cst, conn, se.cp.DBName(), tableNameStr); err != nil { + return nil, err + } + // Update the PK columns for the table as well as they may have changed. + cst.PKColumns = nil // We're going to repopulate the PK columns + if err := se.populatePrimaryKeys(ctx, conn.Conn, map[string]*Table{tableNameStr: &cst}); err != nil { + return nil, err + } + se.tables[tableNameStr] = &cst + return newMinimalTable(&cst), nil + } + // It's expected that internal tables are not found within VReplication workflows. + // No need to refresh the cache for internal tables. + if schema.IsInternalOperationTableName(tableNameStr) { + log.Infof("internal table %v found in vttablet schema: skipping for GTID search", tableNameStr) + return nil, nil + } + // We don't currently have the non-internal table in the cache. This can happen when + // a table was created after the last schema reload (which happens at least every + // --queryserver-config-schema-reload-time). + // Whatever the reason, we should ensure that our cache is able to get the latest + // table schema for the "current" position IF the table exists in the database. + // In order to ensure this, we need to reload the latest schema so that our cache + // is up to date. This effectively turns our in-memory cache into a read-through + // cache for VReplication related needs (this function is only used by vstreamers). + // This adds an additional cost, but for VReplication it should be rare that we are + // trying to replicate a table that doesn't actually exist. + // This also allows us to perform a just-in-time initialization of the cache if + // a vstreamer is the first one to access it. + if se.conns != nil { // Test Engines (NewEngineForTests()) don't have a conns pool + if err := se.reload(ctx, true); err != nil { + return nil, err + } + if st, ok := se.tables[tableNameStr]; ok { + return newMinimalTable(st), nil } } - return newMinimalTable(st), nil + + log.Infof("table %v not found in vttablet schema, current tables: %v", tableNameStr, se.tables) + return nil, fmt.Errorf("table %v not found in vttablet schema", tableNameStr) } // RegisterNotifier registers the function for schema change notification. diff --git a/go/vt/vttablet/tabletserver/schema/engine_test.go b/go/vt/vttablet/tabletserver/schema/engine_test.go index b6cdf244297..b3a8b1e2971 100644 --- a/go/vt/vttablet/tabletserver/schema/engine_test.go +++ b/go/vt/vttablet/tabletserver/schema/engine_test.go @@ -21,6 +21,7 @@ import ( "errors" "expvar" "fmt" + "math/rand/v2" "net/http" "net/http/httptest" "sort" @@ -41,12 +42,14 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/vt/dbconfigs" - querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema/schematest" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" ) const baseShowTablesPattern = `SELECT t\.table_name.*` @@ -1299,3 +1302,192 @@ func TestEngineReload(t *testing.T) { require.NoError(t, err) require.NoError(t, db.LastError()) } + +// TestEngineReload tests the vreplication specific GetTableForPos function to ensure +// that it conforms to the intended/expected behavior in various scenarios. +// This more specifically tests the behavior of the function when the historian is +// disabled or otherwise unable to get a table schema for the given position. When it +// CAN, that is tested indepenently in the historian tests. +func TestGetTableForPos(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + fakedb := fakesqldb.New(t) + cfg := tabletenv.NewDefaultConfig() + cfg.DB = newDBConfigs(fakedb) + table := sqlparser.NewIdentifierCS("t1") + column := "col1" + tableSchema := fmt.Sprintf("create table %s (%s varchar(50), primary key(col1))", table.String(), column) + tableMt := &binlogdatapb.MinimalTable{ + Name: table.String(), + Fields: []*querypb.Field{ + { + Name: column, + Type: sqltypes.VarChar, + }, + }, + PKColumns: []int64{0}, // First column: col1 + } + + // Don't do any automatic / TTL based cache refreshes. + se := newEngine(1*time.Hour, 1*time.Hour, 0, fakedb) + se.conns.Open(se.cp, se.cp, se.cp) + se.isOpen = true + se.notifiers = make(map[string]notifier) + se.MakePrimary(true) + se.historian.enabled = false + + addExpectedReloadQueries := func(db *fakesqldb.DB) { + db.AddQuery("SELECT UNIX_TIMESTAMP()", sqltypes.MakeTestResult(sqltypes.MakeTestFields( + "UNIX_TIMESTAMP()", + "int64"), + fmt.Sprintf("%d", time.Now().Unix()), + )) + db.AddQuery(fmt.Sprintf(detectViewChange, sidecar.GetIdentifier()), sqltypes.MakeTestResult(sqltypes.MakeTestFields("table_name", "varchar"))) + db.AddQuery(fmt.Sprintf(readTableCreateTimes, sidecar.GetIdentifier()), + sqltypes.MakeTestResult(sqltypes.MakeTestFields("table_name|create_time", "varchar|int64"))) + db.AddQuery(fmt.Sprintf(detectUdfChange, sidecar.GetIdentifier()), &sqltypes.Result{}) + db.AddQueryPattern(baseShowTablesPattern, + &sqltypes.Result{ + Fields: mysql.BaseShowTablesFields, + RowsAffected: 0, + InsertID: 0, + Rows: [][]sqltypes.Value{ + { + sqltypes.MakeTrusted(sqltypes.VarChar, []byte(table.String())), // table_name + sqltypes.MakeTrusted(sqltypes.VarChar, []byte("BASE TABLE")), // table_type + sqltypes.MakeTrusted(sqltypes.Int64, []byte(fmt.Sprintf("%d", time.Now().Unix()-1000))), // unix_timestamp(t.create_time) + sqltypes.MakeTrusted(sqltypes.VarChar, []byte("")), // table_comment + sqltypes.MakeTrusted(sqltypes.Int64, []byte("128")), // file_size + sqltypes.MakeTrusted(sqltypes.Int64, []byte("256")), // allocated_size + }, + }, + SessionStateChanges: "", + StatusFlags: 0, + }, + ) + db.AddQuery(mysql.BaseShowPrimary, &sqltypes.Result{ + Fields: mysql.ShowPrimaryFields, + Rows: [][]sqltypes.Value{ + mysql.ShowPrimaryRow(table.String(), column), + }, + }) + db.AddQueryPattern(fmt.Sprintf(mysql.GetColumnNamesQueryPatternForTable, table.String()), + sqltypes.MakeTestResult(sqltypes.MakeTestFields("column_name", "varchar"), column)) + db.AddQuery(fmt.Sprintf("SELECT `%s` FROM `fakesqldb`.`%v` WHERE 1 != 1", column, table.String()), + sqltypes.MakeTestResult(sqltypes.MakeTestFields(column, "varchar"))) + db.AddQuery(fmt.Sprintf(`show create table %s`, table.String()), + sqltypes.MakeTestResult(sqltypes.MakeTestFields("Table|Create Table", "varchar|varchar"), table.String(), tableSchema)) + db.AddQuery("begin", &sqltypes.Result{}) + db.AddQuery(fmt.Sprintf("delete from %s.`tables` where TABLE_SCHEMA = database() and TABLE_NAME in ('%s')", + sidecar.GetIdentifier(), table.String()), &sqltypes.Result{}) + db.AddQuery(fmt.Sprintf("insert into %s.`tables`(TABLE_SCHEMA, TABLE_NAME, CREATE_STATEMENT, CREATE_TIME) values (database(), '%s', '%s', %d)", + sidecar.GetIdentifier(), table.String(), tableSchema, time.Now().Unix()), &sqltypes.Result{RowsAffected: 1}) + db.AddQuery("rollback", &sqltypes.Result{}) + } + + type testcase struct { + name string + initialCacheState map[string]*Table + expectedQueriesFunc func(db *fakesqldb.DB) + expectFunc func() + } + tests := []testcase{ + { + name: "GetTableForPos with cache uninitialized", + initialCacheState: make(map[string]*Table), // empty + expectedQueriesFunc: func(db *fakesqldb.DB) { + // We do a reload to initialize the cache. + addExpectedReloadQueries(db) + }, + expectFunc: func() { + tbl, err := se.GetTableForPos(ctx, table, "") + require.NoError(t, err) + require.Equal(t, tableMt, tbl) + }, + }, + { + name: "GetTableForPos with cache uninitialized, table not found", + initialCacheState: make(map[string]*Table), // empty + expectedQueriesFunc: func(db *fakesqldb.DB) { + // We do a reload to initialize the cache and in doing so get the missing table. + addExpectedReloadQueries(db) + }, + expectFunc: func() { + tbl, err := se.GetTableForPos(ctx, sqlparser.NewIdentifierCS("nobueno"), "") + require.EqualError(t, err, "table nobueno not found in vttablet schema") + require.Nil(t, tbl) + }, + }, + { + name: "GetTableForPos with cache initialized, table not found", + initialCacheState: map[string]*Table{"t2": {Name: sqlparser.NewIdentifierCS("t2")}}, + expectedQueriesFunc: func(db *fakesqldb.DB) { + // We do a reload to try and get this missing table and any other recently created ones. + addExpectedReloadQueries(db) + }, + expectFunc: func() { + tbl, err := se.GetTableForPos(ctx, table, "") + require.NoError(t, err) + require.Equal(t, tableMt, tbl) + }, + }, + { + name: "GetTableForPos with cache initialized, table found", + initialCacheState: map[string]*Table{table.String(): {Name: table}}, + expectedQueriesFunc: func(db *fakesqldb.DB) { + // We only reload the column and PK info for the table in our cache. A new column + // called col2 has been added to the table schema and it is the new PK. + newTableSchema := fmt.Sprintf("create table %s (%s varchar(50), col2 varchar(50), primary key(col2))", table.String(), column) + db.AddQuery(mysql.BaseShowPrimary, &sqltypes.Result{ + Fields: mysql.ShowPrimaryFields, + Rows: [][]sqltypes.Value{ + mysql.ShowPrimaryRow(table.String(), "col2"), + }, + }) + db.AddQueryPattern(fmt.Sprintf(mysql.GetColumnNamesQueryPatternForTable, table.String()), + sqltypes.MakeTestResult(sqltypes.MakeTestFields("column_name", "varchar"), column, "col2")) + db.AddQuery(fmt.Sprintf("SELECT `%s`, `%s` FROM `fakesqldb`.`%v` WHERE 1 != 1", + column, "col2", table.String()), sqltypes.MakeTestResult(sqltypes.MakeTestFields(fmt.Sprintf("%s|%s", column, "col2"), "varchar|varchar"))) + db.AddQuery(fmt.Sprintf(`show create table %s`, table.String()), + sqltypes.MakeTestResult(sqltypes.MakeTestFields("Table|Create Table", "varchar|varchar"), table.String(), newTableSchema)) + db.AddQuery("begin", &sqltypes.Result{}) + db.AddQuery(fmt.Sprintf("delete from %s.`tables` where TABLE_SCHEMA = database() and TABLE_NAME in ('%s')", + sidecar.GetIdentifier(), table.String()), &sqltypes.Result{}) + db.AddQuery(fmt.Sprintf("insert into %s.`tables`(TABLE_SCHEMA, TABLE_NAME, CREATE_STATEMENT, CREATE_TIME) values (database(), '%s', '%s', %d)", + sidecar.GetIdentifier(), table.String(), newTableSchema, time.Now().Unix()), &sqltypes.Result{}) + db.AddQuery("rollback", &sqltypes.Result{}) + }, + expectFunc: func() { + tbl, err := se.GetTableForPos(ctx, table, "MySQL56/1497ddb0-7cb9-11ed-a1eb-0242ac120002:1-891") + require.NoError(t, err) + require.NotNil(t, tbl) + require.Equal(t, &binlogdatapb.MinimalTable{ + Name: table.String(), + Fields: []*querypb.Field{ + { + Name: column, + Type: sqltypes.VarChar, + }, + { + Name: "col2", + Type: sqltypes.VarChar, + }, + }, + PKColumns: []int64{1}, // Second column: col2 + }, tbl) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + fakedb.DeleteAllQueries() + AddFakeInnoDBReadRowsResult(fakedb, int(rand.Int32N(1000000))) + tc.expectedQueriesFunc(fakedb) + se.tables = tc.initialCacheState + tc.expectFunc() + fakedb.VerifyAllExecutedOrFail() + require.NoError(t, fakedb.LastError()) + }) + } +} diff --git a/go/vt/vttablet/tabletserver/schema/historian_test.go b/go/vt/vttablet/tabletserver/schema/historian_test.go index 1d66ecefd97..2b7482866f1 100644 --- a/go/vt/vttablet/tabletserver/schema/historian_test.go +++ b/go/vt/vttablet/tabletserver/schema/historian_test.go @@ -17,6 +17,7 @@ limitations under the License. package schema import ( + "context" "fmt" "testing" "time" @@ -25,11 +26,11 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/collations" - "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/sqlparser" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" - "vitess.io/vitess/go/vt/sqlparser" ) func getTable(name string, fieldNames []string, fieldTypes []querypb.Type, pks []int64) *binlogdatapb.MinimalTable { @@ -78,6 +79,7 @@ func getDbSchemaBlob(t *testing.T, tables map[string]*binlogdatapb.MinimalTable) } func TestHistorian(t *testing.T) { + ctx := context.Background() se, db, cancel := getTestSchemaEngine(t, 0) defer cancel() @@ -88,13 +90,13 @@ func TestHistorian(t *testing.T) { ddl1 := "create table tracker_test (id int)" ts1 := int64(1427325876) _, _, _ = ddl1, ts1, db - _, err := se.GetTableForPos(sqlparser.NewIdentifierCS("t1"), gtid1) + _, err := se.GetTableForPos(ctx, sqlparser.NewIdentifierCS("t1"), gtid1) require.Equal(t, "table t1 not found in vttablet schema", err.Error()) - tab, err := se.GetTableForPos(sqlparser.NewIdentifierCS("dual"), gtid1) + tab, err := se.GetTableForPos(ctx, sqlparser.NewIdentifierCS("dual"), gtid1) require.NoError(t, err) require.Equal(t, `name:"dual"`, fmt.Sprintf("%v", tab)) se.EnableHistorian(true) - _, err = se.GetTableForPos(sqlparser.NewIdentifierCS("t1"), gtid1) + _, err = se.GetTableForPos(ctx, sqlparser.NewIdentifierCS("t1"), gtid1) require.Equal(t, "table t1 not found in vttablet schema", err.Error()) var blob1 string @@ -127,11 +129,11 @@ func TestHistorian(t *testing.T) { }) require.Nil(t, se.RegisterVersionEvent()) exp1 := `name:"t1" fields:{name:"id1" type:INT32 table:"t1" charset:63 flags:32768} fields:{name:"id2" type:INT32 table:"t1" charset:63 flags:32768} p_k_columns:0` - tab, err = se.GetTableForPos(sqlparser.NewIdentifierCS("t1"), gtid1) + tab, err = se.GetTableForPos(ctx, sqlparser.NewIdentifierCS("t1"), gtid1) require.NoError(t, err) require.Equal(t, exp1, fmt.Sprintf("%v", tab)) gtid2 := gtidPrefix + "1-20" - _, err = se.GetTableForPos(sqlparser.NewIdentifierCS("t1"), gtid2) + _, err = se.GetTableForPos(ctx, sqlparser.NewIdentifierCS("t1"), gtid2) require.Equal(t, "table t1 not found in vttablet schema", err.Error()) table = getTable("t1", []string{"id1", "id2"}, []querypb.Type{querypb.Type_INT32, querypb.Type_VARBINARY}, []int64{0}) @@ -147,11 +149,11 @@ func TestHistorian(t *testing.T) { }) require.Nil(t, se.RegisterVersionEvent()) exp2 := `name:"t1" fields:{name:"id1" type:INT32 table:"t1" charset:63 flags:32768} fields:{name:"id2" type:VARBINARY table:"t1" charset:63 flags:128} p_k_columns:0` - tab, err = se.GetTableForPos(sqlparser.NewIdentifierCS("t1"), gtid2) + tab, err = se.GetTableForPos(ctx, sqlparser.NewIdentifierCS("t1"), gtid2) require.NoError(t, err) require.Equal(t, exp2, fmt.Sprintf("%v", tab)) gtid3 := gtidPrefix + "1-30" - _, err = se.GetTableForPos(sqlparser.NewIdentifierCS("t1"), gtid3) + _, err = se.GetTableForPos(ctx, sqlparser.NewIdentifierCS("t1"), gtid3) require.Equal(t, "table t1 not found in vttablet schema", err.Error()) table = getTable("t1", []string{"id1", "id2", "id3"}, []querypb.Type{querypb.Type_INT32, querypb.Type_VARBINARY, querypb.Type_INT32}, []int64{0}) @@ -167,22 +169,23 @@ func TestHistorian(t *testing.T) { }) require.Nil(t, se.RegisterVersionEvent()) exp3 := `name:"t1" fields:{name:"id1" type:INT32 table:"t1" charset:63 flags:32768} fields:{name:"id2" type:VARBINARY table:"t1" charset:63 flags:128} fields:{name:"id3" type:INT32 table:"t1" charset:63 flags:32768} p_k_columns:0` - tab, err = se.GetTableForPos(sqlparser.NewIdentifierCS("t1"), gtid3) + tab, err = se.GetTableForPos(ctx, sqlparser.NewIdentifierCS("t1"), gtid3) require.NoError(t, err) require.Equal(t, exp3, fmt.Sprintf("%v", tab)) - tab, err = se.GetTableForPos(sqlparser.NewIdentifierCS("t1"), gtid1) + tab, err = se.GetTableForPos(ctx, sqlparser.NewIdentifierCS("t1"), gtid1) require.NoError(t, err) require.Equal(t, exp1, fmt.Sprintf("%v", tab)) - tab, err = se.GetTableForPos(sqlparser.NewIdentifierCS("t1"), gtid2) + tab, err = se.GetTableForPos(ctx, sqlparser.NewIdentifierCS("t1"), gtid2) require.NoError(t, err) require.Equal(t, exp2, fmt.Sprintf("%v", tab)) - tab, err = se.GetTableForPos(sqlparser.NewIdentifierCS("t1"), gtid3) + tab, err = se.GetTableForPos(ctx, sqlparser.NewIdentifierCS("t1"), gtid3) require.NoError(t, err) require.Equal(t, exp3, fmt.Sprintf("%v", tab)) } func TestHistorianPurgeOldSchemas(t *testing.T) { + ctx := context.Background() schemaVersionMaxAgeSeconds := 3600 // 1 hour se, db, cancel := getTestSchemaEngine(t, int64(schemaVersionMaxAgeSeconds)) defer cancel() @@ -194,7 +197,7 @@ func TestHistorianPurgeOldSchemas(t *testing.T) { ts1 := time.Now().Add(time.Duration(-24) * time.Hour) _, _, _ = ddl1, ts1, db se.EnableHistorian(true) - _, err := se.GetTableForPos(sqlparser.NewIdentifierCS("t1"), gtid1) + _, err := se.GetTableForPos(ctx, sqlparser.NewIdentifierCS("t1"), gtid1) require.Equal(t, "table t1 not found in vttablet schema", err.Error()) var blob1 string @@ -226,14 +229,14 @@ func TestHistorianPurgeOldSchemas(t *testing.T) { }, }) require.Nil(t, se.RegisterVersionEvent()) - _, err = se.GetTableForPos(sqlparser.NewIdentifierCS("t1"), gtid1) + _, err = se.GetTableForPos(ctx, sqlparser.NewIdentifierCS("t1"), gtid1) // validate the old schema has been purged require.Equal(t, "table t1 not found in vttablet schema", err.Error()) require.Equal(t, 0, len(se.historian.schemas)) // add a second schema record row with a time_updated that won't be purged gtid2 := gtidPrefix + "1-20" - _, err = se.GetTableForPos(sqlparser.NewIdentifierCS("t1"), gtid2) + _, err = se.GetTableForPos(ctx, sqlparser.NewIdentifierCS("t1"), gtid2) require.Equal(t, "table t1 not found in vttablet schema", err.Error()) table = getTable("t1", []string{"id1", "id2"}, []querypb.Type{querypb.Type_INT32, querypb.Type_VARBINARY}, []int64{0}) @@ -250,7 +253,7 @@ func TestHistorianPurgeOldSchemas(t *testing.T) { }) require.Nil(t, se.RegisterVersionEvent()) exp2 := `name:"t1" fields:{name:"id1" type:INT32 table:"t1" charset:63 flags:32768} fields:{name:"id2" type:VARBINARY table:"t1" charset:63 flags:128} p_k_columns:0` - tab, err := se.GetTableForPos(sqlparser.NewIdentifierCS("t1"), gtid2) + tab, err := se.GetTableForPos(ctx, sqlparser.NewIdentifierCS("t1"), gtid2) require.NoError(t, err) require.Equal(t, exp2, fmt.Sprintf("%v", tab)) require.Equal(t, 1, len(se.historian.schemas)) diff --git a/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go b/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go index 27b5fca91c8..49dabae3973 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/helper_event_test.go @@ -35,7 +35,6 @@ package vstreamer // The test framework will not work if the queries use double quotes for string literals at the moment. import ( - "context" "fmt" "slices" "strings" @@ -315,7 +314,6 @@ func (ts *TestSpec) Init() { } ts.pkColumns[t.Name()] = pkColumns } - engine.se.Reload(context.Background()) } // Close() should be called (via defer) at the end of the test to clean up the tables created in the test. @@ -372,7 +370,6 @@ func (ts *TestSpec) getBindVarsForUpdate(stmt sqlparser.Statement) (string, map[ // Run() runs the test. It first initializes the test, then runs the queries and validates the events. func (ts *TestSpec) Run() { - require.NoError(ts.t, engine.se.Reload(context.Background())) if !ts.inited { ts.Init() } @@ -471,20 +468,6 @@ func (ts *TestSpec) getDDLEvent(query string) string { return ddlEvent.String() } -func (ts *TestSpec) reloadSchema() { - engine.se.Reload(context.Background()) - var ddls []string - for _, table := range ts.tables { - showCreateTableDDL := fmt.Sprintf("show create table %s", table) - qr, err := env.Mysqld.FetchSuperQuery(context.Background(), showCreateTableDDL) - require.NoError(ts.t, err) - ddls = append(ddls, qr.Rows[0][1].ToString()) - } - var err error - ts.schema, err = schemadiff.NewSchemaFromQueries(schemadiff.NewTestEnv(), ddls) - require.NoError(ts.t, err) -} - func (ts *TestSpec) getFieldEvent(table *schemadiff.CreateTableEntity) *TestFieldEvent { var tfe TestFieldEvent tfe.table = table.Name() diff --git a/go/vt/vttablet/tabletserver/vstreamer/resultstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/resultstreamer_test.go index a349c89f0a3..964e06362cd 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/resultstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/resultstreamer_test.go @@ -43,7 +43,6 @@ func TestStreamResults(t *testing.T) { defer execStatements(t, []string{ "drop table t1", }) - engine.se.Reload(context.Background()) query := "select id, val from t1 order by id" wantStream := []string{ diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go index ac2b2285e27..bb8ff7af85f 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go @@ -24,7 +24,6 @@ import ( "time" "vitess.io/vitess/go/mysql/collations" - "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/textutil" @@ -147,22 +146,7 @@ func (rs *rowStreamer) buildPlan() error { return err } - st, err := rs.se.GetTableForPos(fromTable, "") - if err != nil { - // There is a scenario where vstreamer's table state can be out-of-date, and this happens - // with vitess migrations, based on vreplication. - // Vitess migrations use an elaborate cut-over flow where tables are swapped away while traffic is - // being blocked. The RENAME flow is such that at some point the table is renamed away, leaving a - // "puncture"; this is an event that is captured by vstreamer. The completion of the flow fixes the - // puncture, and places a new table under the original table's name, but the way it is done does not - // cause vstreamer to refresh schema state. - // There is therefore a reproducible valid sequence of events where vstreamer thinks a table does not - // exist, where it in fact does exist. - // For this reason we give vstreamer a "second chance" to review the up-to-date state of the schema. - // In the future, we will reduce this operation to reading a single table rather than the entire schema. - rs.se.ReloadAt(context.Background(), replication.Position{}) - st, err = rs.se.GetTableForPos(fromTable, "") - } + st, err := rs.se.GetTableForPos(rs.ctx, fromTable, "") if err != nil { return err } diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go index 47efb466d3a..48d11d9e856 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go @@ -21,7 +21,6 @@ import ( "fmt" "regexp" "testing" - "time" "github.com/stretchr/testify/require" @@ -41,7 +40,6 @@ func TestRowStreamerQuery(t *testing.T) { defer execStatements(t, []string{ "drop table t1", }) - engine.se.Reload(context.Background()) // We need to StreamRows, to get an initialized RowStreamer. // Note that the query passed into StreamRows is overwritten while running the test. err := engine.StreamRows(context.Background(), "select * from t1", nil, func(rows *binlogdatapb.VStreamRowsResponse) error { @@ -116,8 +114,6 @@ func TestStreamRowsScan(t *testing.T) { "drop table t5", }) - engine.se.Reload(context.Background()) - // t1: simulates rollup wantStream := []string{ `fields:{name:"1" type:INT64 charset:63} pkfields:{name:"id" type:INT32 charset:63}`, @@ -230,13 +226,13 @@ func TestStreamRowsScan(t *testing.T) { wantQuery = "select /*+ MAX_EXECUTION_TIME(3600000) */ id1, id2, id3, val from t5 force index (`id1_id2_id3`) where (id1 = 1 and id2 = 2 and id3 > 3) or (id1 = 1 and id2 > 2) or (id1 > 1) order by id1, id2, id3" checkStream(t, "select * from t5", []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2), sqltypes.NewInt64(3)}, wantQuery, wantStream) - // t1: test for unsupported integer literal + // t5: test for unsupported integer literal wantError := "only the integer literal 1 is supported" - expectStreamError(t, "select 2 from t1", wantError) + expectStreamError(t, "select 2 from t5", wantError) - // t1: test for unsupported literal type + // t5: test for unsupported literal type wantError = "only integer literals are supported" - expectStreamError(t, "select 'a' from t1", wantError) + expectStreamError(t, "select 'a' from t5", wantError) } func TestStreamRowsUnicode(t *testing.T) { @@ -262,7 +258,6 @@ func TestStreamRowsUnicode(t *testing.T) { return in }) defer engine.Close() - engine.se.Reload(context.Background()) // We need a latin1 connection. conn, err := env.Mysqld.GetDbaConnection(context.Background()) if err != nil { @@ -298,7 +293,6 @@ func TestStreamRowsKeyRange(t *testing.T) { if testing.Short() { t.Skip() } - engine.se.Reload(context.Background()) if err := env.SetVSchema(shardedVSchema); err != nil { t.Fatal(err) @@ -313,9 +307,6 @@ func TestStreamRowsKeyRange(t *testing.T) { defer execStatements(t, []string{ "drop table t1", }) - engine.se.Reload(context.Background()) - - time.Sleep(1 * time.Second) // Only the first row should be returned, but lastpk should be 6. wantStream := []string{ @@ -346,9 +337,6 @@ func TestStreamRowsFilterInt(t *testing.T) { defer execStatements(t, []string{ "drop table t1", }) - engine.se.Reload(context.Background()) - - time.Sleep(1 * time.Second) wantStream := []string{ `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32 charset:63}`, @@ -379,9 +367,6 @@ func TestStreamRowsFilterVarBinary(t *testing.T) { defer execStatements(t, []string{ "drop table t1", }) - engine.se.Reload(context.Background()) - - time.Sleep(1 * time.Second) wantStream := []string{ `fields:{name:"id1" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id1" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id1" type:INT32 charset:63}`, @@ -407,7 +392,6 @@ func TestStreamRowsMultiPacket(t *testing.T) { defer execStatements(t, []string{ "drop table t1", }) - engine.se.Reload(context.Background()) wantStream := []string{ `fields:{name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 column_type:"int(11)"} fields:{name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 column_type:"varbinary(128)"} pkfields:{name:"id" type:INT32 charset:63}`, @@ -435,7 +419,6 @@ func TestStreamRowsCancel(t *testing.T) { defer execStatements(t, []string{ "drop table t1", }) - engine.se.Reload(context.Background()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/go/vt/vttablet/tabletserver/vstreamer/tablestreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/tablestreamer_test.go index bc6ba98d636..9be3940c01d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/tablestreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/tablestreamer_test.go @@ -51,8 +51,6 @@ func TestTableStreamer(t *testing.T) { "drop table t4", }) - engine.se.Reload(context.Background()) - wantStream := []string{ "table_name:\"t1\" fields:{name:\"id\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id\" column_length:11 charset:63 flags:53251} fields:{name:\"val\" type:VARBINARY table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"val\" column_length:128 charset:63 flags:128} pkfields:{name:\"id\" type:INT32 charset:63 flags:53251}", "table_name:\"t1\" rows:{lengths:1 lengths:3 values:\"1aaa\"} rows:{lengths:1 lengths:3 values:\"2bbb\"} lastpk:{lengths:1 values:\"2\"}", diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index 19c27e8a725..389c06a671e 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -174,7 +174,6 @@ func TestVStreamCopyCompleteFlow(t *testing.T) { } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - engine.se.Reload(context.Background()) defer execStatements(t, []string{ "drop table t1", diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 70ca7dbaca1..bf41111bbc8 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -805,7 +805,7 @@ func (vs *vstreamer) buildTableColumns(tm *mysql.TableMap) ([]*querypb.Field, er Flags: mysql.FlagsForColumn(t, coll), }) } - st, err := vs.se.GetTableForPos(sqlparser.NewIdentifierCS(tm.Name), replication.EncodePosition(vs.pos)) + st, err := vs.se.GetTableForPos(vs.ctx, sqlparser.NewIdentifierCS(tm.Name), replication.EncodePosition(vs.pos)) if err != nil { if vs.filter.FieldEventMode == binlogdatapb.Filter_ERR_ON_MISMATCH { log.Infof("No schema found for table %s", tm.Name) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index d3e4d2730b6..8d0d182790e 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -224,7 +224,6 @@ func TestSetStatement(t *testing.T) { log.Info("Cannot test SetStatement on this flavor") return } - engine.se.Reload(context.Background()) execStatements(t, []string{ "create table t1(id int, val varbinary(128), primary key(id))", @@ -232,7 +231,6 @@ func TestSetStatement(t *testing.T) { defer execStatements(t, []string{ "drop table t1", }) - engine.se.Reload(context.Background()) queries := []string{ "begin", "insert into t1 values (1, 'aaa')", @@ -312,6 +310,7 @@ func TestVersion(t *testing.T) { engine = oldEngine }() + ctx := context.Background() err := env.SchemaEngine.EnableHistorian(true) require.NoError(t, err) defer env.SchemaEngine.EnableHistorian(false) @@ -334,7 +333,6 @@ func TestVersion(t *testing.T) { }}, } blob, _ := dbSchema.MarshalVT() - engine.se.Reload(context.Background()) gtid := "MariaDB/0-41983-20" testcases := []testcase{{ input: []string{ @@ -348,7 +346,7 @@ func TestVersion(t *testing.T) { `commit`}}, }} runCases(t, nil, testcases, "", nil) - mt, err := env.SchemaEngine.GetTableForPos(sqlparser.NewIdentifierCS("t1"), gtid) + mt, err := env.SchemaEngine.GetTableForPos(ctx, sqlparser.NewIdentifierCS("t1"), gtid) require.NoError(t, err) assert.True(t, proto.Equal(mt, dbSchema.Tables[0])) } @@ -373,7 +371,6 @@ func TestMissingTables(t *testing.T) { "insert into shortlived values (1,1), (2,2)", "alter table shortlived rename to _shortlived", }) - engine.se.Reload(context.Background()) filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ Match: "t1", @@ -1059,7 +1056,6 @@ func TestREKeyRange(t *testing.T) { } }` setVSchema(t, altVSchema) - engine.se.Reload(context.Background()) ts.Reset() // Only the first insert should be sent. ts.tests = [][]*TestQuery{{ @@ -1289,7 +1285,6 @@ func TestDDLAddColumn(t *testing.T) { } func TestDDLDropColumn(t *testing.T) { - env.SchemaEngine.Reload(context.Background()) execStatement(t, "create table ddl_test2(id int, val1 varbinary(128), val2 varbinary(128), primary key(id))") defer execStatement(t, "drop table ddl_test2") @@ -1301,8 +1296,6 @@ func TestDDLDropColumn(t *testing.T) { "alter table ddl_test2 drop column val2", "insert into ddl_test2 values(2, 'bbb')", }) - engine.se.Reload(context.Background()) - env.SchemaEngine.Reload(context.Background()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -1439,7 +1432,6 @@ func TestBestEffortNameInFieldEvent(t *testing.T) { defer execStatements(t, []string{ "drop table vitess_test_new", }) - engine.se.Reload(context.Background()) testcases := []testcase{{ input: []string{ "insert into vitess_test_new values(2, 'abc')", @@ -1499,7 +1491,6 @@ func TestInternalTables(t *testing.T) { "drop table _vt_PURGE_1f9194b43b2011eb8a0104ed332e05c2_20201210194431", "drop table _product_old", }) - engine.se.Reload(context.Background()) testcases := []testcase{{ input: []string{ "insert into vitess_test values(2, 'abc')", @@ -1541,7 +1532,6 @@ func TestTypes(t *testing.T) { "drop table vitess_null", "drop table vitess_decimal", }) - engine.se.Reload(context.Background()) testcases := []testcase{{ input: []string{ @@ -1676,7 +1666,6 @@ func TestExternalTable(t *testing.T) { defer execStatements(t, []string{ "drop database external", }) - engine.se.Reload(context.Background()) testcases := []testcase{{ input: []string{ @@ -1701,7 +1690,6 @@ func TestJournal(t *testing.T) { defer execStatements(t, []string{ "drop table _vt.resharding_journal", }) - engine.se.Reload(context.Background()) journal1 := &binlogdatapb.Journal{ Id: 1, @@ -1758,8 +1746,6 @@ func TestStatementMode(t *testing.T) { "create table stream2(id int, val varbinary(128), primary key(id))", }) - engine.se.Reload(context.Background()) - defer execStatements(t, []string{ "drop table stream1", "drop table stream2", @@ -1807,7 +1793,6 @@ func TestNoFutureGTID(t *testing.T) { defer execStatements(t, []string{ "drop table stream1", }) - engine.se.Reload(context.Background()) pos := primaryPosition(t) t.Logf("current position: %v", pos)