Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Online DDL: ensure high lock_wait_timeout in Vreplication cut-over #16601

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,19 @@ func testScheduler(t *testing.T) {
})
}

var originalLockWaitTimeout int64
t.Run("set low lock_wait_timeout", func(t *testing.T) {
rs, err := primaryTablet.VttabletProcess.QueryTablet("select @@lock_wait_timeout as lock_wait_timeout", keyspaceName, false)
require.NoError(t, err)
row := rs.Named().Row()
require.NotNil(t, row)
originalLockWaitTimeout = row.AsInt64("lock_wait_timeout", 0)
require.NotZero(t, originalLockWaitTimeout)

_, err = primaryTablet.VttabletProcess.QueryTablet("set global lock_wait_timeout=1", keyspaceName, false)
require.NoError(t, err)
})

// CREATE
t.Run("CREATE TABLEs t1, t2", func(t *testing.T) {
{ // The table does not exist
Expand Down Expand Up @@ -578,6 +591,16 @@ func testScheduler(t *testing.T) {
assert.NotEmpty(t, rs.Rows)
})

t.Run("low @@lock_wait_timeout", func(t *testing.T) {
defer primaryTablet.VttabletProcess.QueryTablet(fmt.Sprintf("set global lock_wait_timeout=%d", originalLockWaitTimeout), keyspaceName, false)

t1uuid = testOnlineDDLStatement(t, createParams(trivialAlterT1Statement, ddlStrategy, "vtgate", "", "", false)) // wait
t.Run("trivial t1 migration", func(t *testing.T) {
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusComplete)
checkTable(t, t1Name, true)
})
})

forceCutoverCapable, err := capableOf(capabilities.PerformanceSchemaDataLocksTableCapability) // 8.0
require.NoError(t, err)
if forceCutoverCapable {
Expand Down
33 changes: 33 additions & 0 deletions go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,14 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
if err != nil {
return err
}
// Set large enough `@@lock_wait_timeout` so that it does not interfere with the cut-over operation.
// The code will ensure everything that needs to be terminated by `migrationCutOverThreshold` will be terminated.
lockConnRestoreLockWaitTimeout, err := e.initConnectionLockWaitTimeout(ctx, lockConn.Conn, 5*migrationCutOverThreshold)
if err != nil {
return err
}
defer lockConn.Recycle()
defer lockConnRestoreLockWaitTimeout()
defer lockConn.Conn.Exec(ctx, sqlUnlockTables, 1, false)

renameCompleteChan := make(chan error)
Expand All @@ -988,6 +995,12 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
if err != nil {
return err
}
// Set large enough `@@lock_wait_timeout` so that it does not interfere with the cut-over operation.
// The code will ensure everything that needs to be terminated by `migrationCutOverThreshold` will be terminated.
renameConnRestoreLockWaitTimeout, err := e.initConnectionLockWaitTimeout(ctx, renameConn.Conn, 5*migrationCutOverThreshold*4)
if err != nil {
return err
}
defer renameConn.Recycle()
defer func() {
if !renameWasSuccessful {
Expand All @@ -997,6 +1010,8 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
}
}
}()
defer renameConnRestoreLockWaitTimeout()

// See if backend MySQL server supports 'rename_table_preserve_foreign_key' variable
preserveFKSupported, err := e.isPreserveForeignKeySupported(ctx)
if err != nil {
Expand Down Expand Up @@ -1260,6 +1275,24 @@ func (e *Executor) initMigrationSQLMode(ctx context.Context, onlineDDL *schema.O
return deferFunc, nil
}

// initConnectionLockWaitTimeout sets the given lock_wait_timeout for the given connection, with a deferred value restoration function
func (e *Executor) initConnectionLockWaitTimeout(ctx context.Context, conn *connpool.Conn, lockWaitTimeout time.Duration) (deferFunc func(), err error) {
deferFunc = func() {}
mattlord marked this conversation as resolved.
Show resolved Hide resolved

if _, err := conn.Exec(ctx, `set @lock_wait_timeout=@@session.lock_wait_timeout`, 0, false); err != nil {
return deferFunc, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "could not read lock_wait_timeout: %v", err)
}
timeoutSeconds := int64(lockWaitTimeout.Seconds())
setQuery := fmt.Sprintf("set @@session.lock_wait_timeout=%d", timeoutSeconds)
if _, err := conn.Exec(ctx, setQuery, 0, false); err != nil {
return deferFunc, err
}
deferFunc = func() {
conn.Exec(ctx, "set @@session.lock_wait_timeout=@lock_wait_timeout", 0, false)
}
return deferFunc, nil
}

// newConstraintName generates a new, unique name for a constraint. Our problem is that a MySQL
// constraint's name is unique in the schema (!). And so as we duplicate the original table, we must
// create completely new names for all constraints.
Expand Down
Loading