From 43a9ed8a1da189e9e27f6f70c79fa3eeea7a85e2 Mon Sep 17 00:00:00 2001 From: Yang Keao Date: Thu, 13 Feb 2025 14:32:30 +0800 Subject: [PATCH] add cluster_id to the mysql.tidb table Signed-off-by: Yang Keao --- .../snap_client/systable_restore_test.go | 2 +- pkg/ddl/schematracker/checker.go | 1 + pkg/session/bootstrap.go | 35 +++++++++++- pkg/session/bootstrap_test.go | 56 +++++++++++++++++++ 4 files changed, 92 insertions(+), 2 deletions(-) diff --git a/br/pkg/restore/snap_client/systable_restore_test.go b/br/pkg/restore/snap_client/systable_restore_test.go index 504b3a45cd6fa..f9308576edc17 100644 --- a/br/pkg/restore/snap_client/systable_restore_test.go +++ b/br/pkg/restore/snap_client/systable_restore_test.go @@ -116,5 +116,5 @@ func TestCheckSysTableCompatibility(t *testing.T) { // // The above variables are in the file br/pkg/restore/systable_restore.go func TestMonitorTheSystemTableIncremental(t *testing.T) { - require.Equal(t, int64(241), session.CurrentBootstrapVersion) + require.Equal(t, int64(242), session.CurrentBootstrapVersion) } diff --git a/pkg/ddl/schematracker/checker.go b/pkg/ddl/schematracker/checker.go index 5aac03ebcc6dd..bdeadf44ec26c 100644 --- a/pkg/ddl/schematracker/checker.go +++ b/pkg/ddl/schematracker/checker.go @@ -565,6 +565,7 @@ func (d *Checker) DoDDLJobWrapper(ctx sessionctx.Context, jobW *ddl.JobWrapper) type storageAndMore interface { kv.Storage + kv.StorageWithPD kv.EtcdBackend helper.Storage } diff --git a/pkg/session/bootstrap.go b/pkg/session/bootstrap.go index 1ba6049a10676..6e37280bd4e51 100644 --- a/pkg/session/bootstrap.go +++ b/pkg/session/bootstrap.go @@ -859,6 +859,8 @@ const ( tidbDefOOMAction = "default_oom_action" // The variable name in mysql.tidb table and it records the current DDLTableVersion tidbDDLTableVersion = "ddl_table_version" + // The variable name in mysql.tidb table and it records the cluster id of this cluster + tidbClusterID = "cluster_id" // Const for TiDB server version 2. version2 = 2 version3 = 3 @@ -1242,11 +1244,15 @@ const ( // Add index on user field for some mysql tables. version241 = 241 + + // version 242 + // insert `cluster_id` into the `mysql.tidb` table. + version242 = 242 ) // currentBootstrapVersion is defined as a variable, so we can modify its value for testing. // please make sure this is the largest version -var currentBootstrapVersion int64 = version241 +var currentBootstrapVersion int64 = version242 // DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it. var internalSQLTimeout = owner.ManagerSessionTTL + 15 @@ -1423,6 +1429,7 @@ var ( upgradeToVer239, upgradeToVer240, upgradeToVer241, + upgradeToVer242, } ) @@ -3318,6 +3325,30 @@ func upgradeToVer241(s sessiontypes.Session, ver int64) { doReentrantDDL(s, "ALTER TABLE mysql.default_roles ADD INDEX i_user (user)", dbterror.ErrDupKeyName) } +// writeClusterID writes cluster id into mysql.tidb +func writeClusterID(s sessiontypes.Session) { + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(internalSQLTimeout)*time.Second) + defer cancel() + + clusterID := s.GetDomain().(*domain.Domain).GetPDClient().GetClusterID(ctx) + + mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, "TiDB Cluster ID.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`, + mysql.SystemDB, + mysql.TiDBTable, + tidbClusterID, + clusterID, + clusterID, + ) +} + +func upgradeToVer242(s sessiontypes.Session, ver int64) { + if ver >= version242 { + return + } + + writeClusterID(s) +} + // initGlobalVariableIfNotExists initialize a global variable with specific val if it does not exist. func initGlobalVariableIfNotExists(s sessiontypes.Session, name string, val any) { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap) @@ -3571,6 +3602,8 @@ func doDMLWorks(s sessiontypes.Session) { writeDDLTableVersion(s) + writeClusterID(s) + ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap) _, err := s.ExecuteInternal(ctx, "COMMIT") if err != nil { diff --git a/pkg/session/bootstrap_test.go b/pkg/session/bootstrap_test.go index c03d77a2774db..06aabd8234c25 100644 --- a/pkg/session/bootstrap_test.go +++ b/pkg/session/bootstrap_test.go @@ -2704,3 +2704,59 @@ func TestGetFuncName(t *testing.T) { } }) } + +func TestWriteClusterIDToMySQLTiDBWhenUpgradingTo242(t *testing.T) { + ctx := context.Background() + store, dom := CreateStoreAndBootstrap(t) + defer func() { require.NoError(t, store.Close()) }() + + // `cluster_id` is inserted for a new TiDB cluster. + se := CreateSessionAndSetID(t, store) + r := MustExecToRecodeSet(t, se, `select VARIABLE_VALUE from mysql.tidb where VARIABLE_NAME='cluster_id'`) + req := r.NewChunk(nil) + err := r.Next(ctx, req) + require.NoError(t, err) + require.Equal(t, 1, req.NumRows()) + require.NotEmpty(t, req.GetRow(0).GetBytes(0)) + require.NoError(t, r.Close()) + se.Close() + + // bootstrap as version241 + ver241 := version241 + seV241 := CreateSessionAndSetID(t, store) + txn, err := store.Begin() + require.NoError(t, err) + m := meta.NewMutator(txn) + err = m.FinishBootstrap(int64(ver241)) + require.NoError(t, err) + revertVersionAndVariables(t, seV241, ver241) + // remove the cluster_id entry from mysql.tidb table + MustExec(t, seV241, "delete from mysql.tidb where variable_name='cluster_id'") + err = txn.Commit(ctx) + require.NoError(t, err) + store.SetOption(StoreBootstrappedKey, nil) + ver, err := getBootstrapVersion(seV241) + require.NoError(t, err) + require.Equal(t, int64(ver241), ver) + seV241.Close() + + // upgrade to current version + dom.Close() + domCurVer, err := BootstrapSession(store) + require.NoError(t, err) + defer domCurVer.Close() + seCurVer := CreateSessionAndSetID(t, store) + ver, err = getBootstrapVersion(seCurVer) + require.NoError(t, err) + require.Equal(t, currentBootstrapVersion, ver) + + // check if the cluster_id has been set in the `mysql.tidb` table during upgrade + r = MustExecToRecodeSet(t, seCurVer, `select VARIABLE_VALUE from mysql.tidb where VARIABLE_NAME='cluster_id'`) + req = r.NewChunk(nil) + err = r.Next(ctx, req) + require.NoError(t, err) + require.Equal(t, 1, req.NumRows()) + require.NotEmpty(t, req.GetRow(0).GetBytes(0)) + require.NoError(t, r.Close()) + seCurVer.Close() +}