Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/systable_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,5 +116,5 @@ func TestCheckSysTableCompatibility(t *testing.T) {
//
// The above variables are in the file br/pkg/restore/systable_restore.go
func TestMonitorTheSystemTableIncremental(t *testing.T) {
require.Equal(t, int64(241), session.CurrentBootstrapVersion)
require.Equal(t, int64(242), session.CurrentBootstrapVersion)
}
1 change: 1 addition & 0 deletions pkg/ddl/schematracker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@ func (d *Checker) DoDDLJobWrapper(ctx sessionctx.Context, jobW *ddl.JobWrapper)

type storageAndMore interface {
kv.Storage
kv.StorageWithPD
kv.EtcdBackend
helper.Storage
}
Expand Down
35 changes: 34 additions & 1 deletion pkg/session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,8 @@ const (
tidbDefOOMAction = "default_oom_action"
// The variable name in mysql.tidb table and it records the current DDLTableVersion
tidbDDLTableVersion = "ddl_table_version"
// The variable name in mysql.tidb table and it records the cluster id of this cluster
tidbClusterID = "cluster_id"
// Const for TiDB server version 2.
version2 = 2
version3 = 3
Expand Down Expand Up @@ -1242,11 +1244,15 @@ const (

// Add index on user field for some mysql tables.
version241 = 241

// version 242
// insert `cluster_id` into the `mysql.tidb` table.
version242 = 242
)

// currentBootstrapVersion is defined as a variable, so we can modify its value for testing.
// please make sure this is the largest version
var currentBootstrapVersion int64 = version241
var currentBootstrapVersion int64 = version242

// DDL owner key's expired time is ManagerSessionTTL seconds, we should wait the time and give more time to have a chance to finish it.
var internalSQLTimeout = owner.ManagerSessionTTL + 15
Expand Down Expand Up @@ -1423,6 +1429,7 @@ var (
upgradeToVer239,
upgradeToVer240,
upgradeToVer241,
upgradeToVer242,
}
)

Expand Down Expand Up @@ -3318,6 +3325,30 @@ func upgradeToVer241(s sessiontypes.Session, ver int64) {
doReentrantDDL(s, "ALTER TABLE mysql.default_roles ADD INDEX i_user (user)", dbterror.ErrDupKeyName)
}

// writeClusterID writes cluster id into mysql.tidb
func writeClusterID(s sessiontypes.Session) {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(internalSQLTimeout)*time.Second)
defer cancel()

clusterID := s.GetDomain().(*domain.Domain).GetPDClient().GetClusterID(ctx)

mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, "TiDB Cluster ID.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`,
mysql.SystemDB,
mysql.TiDBTable,
tidbClusterID,
clusterID,
clusterID,
)
}

func upgradeToVer242(s sessiontypes.Session, ver int64) {
if ver >= version242 {
return
}

writeClusterID(s)
}

// initGlobalVariableIfNotExists initialize a global variable with specific val if it does not exist.
func initGlobalVariableIfNotExists(s sessiontypes.Session, name string, val any) {
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap)
Expand Down Expand Up @@ -3571,6 +3602,8 @@ func doDMLWorks(s sessiontypes.Session) {

writeDDLTableVersion(s)

writeClusterID(s)

ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap)
_, err := s.ExecuteInternal(ctx, "COMMIT")
if err != nil {
Expand Down
56 changes: 56 additions & 0 deletions pkg/session/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2704,3 +2704,59 @@ func TestGetFuncName(t *testing.T) {
}
})
}

func TestWriteClusterIDToMySQLTiDBWhenUpgradingTo242(t *testing.T) {
ctx := context.Background()
store, dom := CreateStoreAndBootstrap(t)
defer func() { require.NoError(t, store.Close()) }()

// `cluster_id` is inserted for a new TiDB cluster.
se := CreateSessionAndSetID(t, store)
r := MustExecToRecodeSet(t, se, `select VARIABLE_VALUE from mysql.tidb where VARIABLE_NAME='cluster_id'`)
req := r.NewChunk(nil)
err := r.Next(ctx, req)
require.NoError(t, err)
require.Equal(t, 1, req.NumRows())
require.NotEmpty(t, req.GetRow(0).GetBytes(0))
require.NoError(t, r.Close())
se.Close()

// bootstrap as version241
ver241 := version241
seV241 := CreateSessionAndSetID(t, store)
txn, err := store.Begin()
require.NoError(t, err)
m := meta.NewMutator(txn)
err = m.FinishBootstrap(int64(ver241))
require.NoError(t, err)
revertVersionAndVariables(t, seV241, ver241)
// remove the cluster_id entry from mysql.tidb table
MustExec(t, seV241, "delete from mysql.tidb where variable_name='cluster_id'")
err = txn.Commit(ctx)
require.NoError(t, err)
store.SetOption(StoreBootstrappedKey, nil)
ver, err := getBootstrapVersion(seV241)
require.NoError(t, err)
require.Equal(t, int64(ver241), ver)
seV241.Close()

// upgrade to current version
dom.Close()
domCurVer, err := BootstrapSession(store)
require.NoError(t, err)
defer domCurVer.Close()
seCurVer := CreateSessionAndSetID(t, store)
ver, err = getBootstrapVersion(seCurVer)
require.NoError(t, err)
require.Equal(t, currentBootstrapVersion, ver)

// check if the cluster_id has been set in the `mysql.tidb` table during upgrade
r = MustExecToRecodeSet(t, seCurVer, `select VARIABLE_VALUE from mysql.tidb where VARIABLE_NAME='cluster_id'`)
req = r.NewChunk(nil)
err = r.Next(ctx, req)
require.NoError(t, err)
require.Equal(t, 1, req.NumRows())
require.NotEmpty(t, req.GetRow(0).GetBytes(0))
require.NoError(t, r.Close())
seCurVer.Close()
}