Skip to content

Commit

Permalink
fix releasing the global read lock when mysqlshell backup fails (vite…
Browse files Browse the repository at this point in the history
…ssio#17000)

Signed-off-by: Renan Rangel <rrangel@slack-corp.com>
Signed-off-by: 'Renan Rangel' <rrangel@slack-corp.com>
  • Loading branch information
rvrangel committed Oct 24, 2024
1 parent 4442173 commit c06b527
Show file tree
Hide file tree
Showing 6 changed files with 443 additions and 6 deletions.
39 changes: 39 additions & 0 deletions go/ioutil/writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Copyright 2022 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

/*
MeteredWriteCloser and MeteredWriter are respectively, time-and-byte-tracking
wrappers around WriteCloser and Writer.
*/

package ioutil

import (
"bytes"
)

// BytesBufferWriter implements io.WriteCloser using an in-memory buffer.
type BytesBufferWriter struct {
*bytes.Buffer
}

func (m BytesBufferWriter) Close() error {
return nil
}

func NewBytesBufferWriter() BytesBufferWriter {
return BytesBufferWriter{bytes.NewBuffer(nil)}
}
92 changes: 92 additions & 0 deletions go/vt/mysqlctl/fakebackupengine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
Copyright 2022 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package mysqlctl

import (
"context"
"time"

"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
)

type FakeBackupEngine struct {
ExecuteBackupCalls []FakeBackupEngineExecuteBackupCall
ExecuteBackupDuration time.Duration
ExecuteBackupReturn FakeBackupEngineExecuteBackupReturn
ExecuteRestoreCalls []FakeBackupEngineExecuteRestoreCall
ExecuteRestoreDuration time.Duration
ExecuteRestoreReturn FakeBackupEngineExecuteRestoreReturn
ShouldDrainForBackupCalls int
ShouldDrainForBackupReturn bool
}

type FakeBackupEngineExecuteBackupCall struct {
BackupParams BackupParams
BackupHandle backupstorage.BackupHandle
}

type FakeBackupEngineExecuteBackupReturn struct {
Ok bool
Err error
}

type FakeBackupEngineExecuteRestoreCall struct {
BackupHandle backupstorage.BackupHandle
RestoreParams RestoreParams
}

type FakeBackupEngineExecuteRestoreReturn struct {
Manifest *BackupManifest
Err error
}

func (be *FakeBackupEngine) ExecuteBackup(
ctx context.Context,
params BackupParams,
bh backupstorage.BackupHandle,
) (bool, error) {
be.ExecuteBackupCalls = append(be.ExecuteBackupCalls, FakeBackupEngineExecuteBackupCall{params, bh})

if be.ExecuteBackupDuration > 0 {
time.Sleep(be.ExecuteBackupDuration)
}

return be.ExecuteBackupReturn.Ok, be.ExecuteBackupReturn.Err
}

func (be *FakeBackupEngine) ExecuteRestore(
ctx context.Context, params RestoreParams,
bh backupstorage.BackupHandle,
) (*BackupManifest, error) {
be.ExecuteRestoreCalls = append(be.ExecuteRestoreCalls, FakeBackupEngineExecuteRestoreCall{bh, params})

// mark restore as in progress
if err := createStateFile(params.Cnf); err != nil {
return nil, err
}

if be.ExecuteRestoreDuration > 0 {
time.Sleep(be.ExecuteRestoreDuration)
}

return be.ExecuteRestoreReturn.Manifest, be.ExecuteRestoreReturn.Err
}

func (be *FakeBackupEngine) ShouldDrainForBackup() bool {
be.ShouldDrainForBackupCalls = be.ShouldDrainForBackupCalls + 1
return be.ShouldDrainForBackupReturn
}
160 changes: 160 additions & 0 deletions go/vt/mysqlctl/fakebackupstorage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
Copyright 2022 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package mysqlctl

import (
"context"
"fmt"
"io"

"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
)

type FakeBackupHandle struct {
Dir string
NameV string
ReadOnly bool
Errors concurrency.AllErrorRecorder

AbortBackupCalls []context.Context
AbortBackupReturn error
AddFileCalls []FakeBackupHandleAddFileCall
AddFileReturn FakeBackupHandleAddFileReturn
EndBackupCalls []context.Context
EndBackupReturn error
ReadFileCalls []FakeBackupHandleReadFileCall
ReadFileReturnF func(ctx context.Context, filename string) (io.ReadCloser, error)
}

type FakeBackupHandleAddFileCall struct {
Ctx context.Context
Filename string
Filesize int64
}

type FakeBackupHandleAddFileReturn struct {
WriteCloser io.WriteCloser
Err error
}

type FakeBackupHandleReadFileCall struct {
Ctx context.Context
Filename string
}

func (fbh *FakeBackupHandle) RecordError(err error) {
fbh.Errors.RecordError(err)
}

func (fbh *FakeBackupHandle) HasErrors() bool {
return fbh.Errors.HasErrors()
}

func (fbh *FakeBackupHandle) Error() error {
return fbh.Errors.Error()
}

func (fbh *FakeBackupHandle) Directory() string {
return fbh.Dir
}

func (fbh *FakeBackupHandle) Name() string {
return fbh.NameV
}

func (fbh *FakeBackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) {
fbh.AddFileCalls = append(fbh.AddFileCalls, FakeBackupHandleAddFileCall{ctx, filename, filesize})
return fbh.AddFileReturn.WriteCloser, fbh.AddFileReturn.Err
}

func (fbh *FakeBackupHandle) EndBackup(ctx context.Context) error {
fbh.EndBackupCalls = append(fbh.EndBackupCalls, ctx)
return fbh.EndBackupReturn
}

func (fbh *FakeBackupHandle) AbortBackup(ctx context.Context) error {
fbh.AbortBackupCalls = append(fbh.AbortBackupCalls, ctx)
return fbh.AbortBackupReturn
}

func (fbh *FakeBackupHandle) ReadFile(ctx context.Context, filename string) (io.ReadCloser, error) {
fbh.ReadFileCalls = append(fbh.ReadFileCalls, FakeBackupHandleReadFileCall{ctx, filename})
if fbh.ReadFileReturnF == nil {
return nil, fmt.Errorf("FakeBackupHandle has not defined a ReadFileReturnF")
}
return fbh.ReadFileReturnF(ctx, filename)
}

type FakeBackupStorage struct {
CloseCalls int
CloseReturn error
ListBackupsCalls []FakeBackupStorageListBackupsCall
ListBackupsReturn FakeBackupStorageListBackupsReturn
RemoveBackupCalls []FakeBackupStorageRemoveBackupCall
RemoveBackupReturn error
RemoveBackupReturne error
StartBackupCalls []FakeBackupStorageStartBackupCall
StartBackupReturn FakeBackupStorageStartBackupReturn
}

type FakeBackupStorageListBackupsCall struct {
Ctx context.Context
Dir string
}

type FakeBackupStorageListBackupsReturn struct {
BackupHandles []backupstorage.BackupHandle
Err error
}

type FakeBackupStorageRemoveBackupCall struct {
Ctx context.Context
Dir string
Name string
}

type FakeBackupStorageStartBackupCall struct {
Ctx context.Context
Dir string
Name string
}

type FakeBackupStorageStartBackupReturn struct {
BackupHandle backupstorage.BackupHandle
Err error
}

func (fbs *FakeBackupStorage) ListBackups(ctx context.Context, dir string) ([]backupstorage.BackupHandle, error) {
fbs.ListBackupsCalls = append(fbs.ListBackupsCalls, FakeBackupStorageListBackupsCall{ctx, dir})
return fbs.ListBackupsReturn.BackupHandles, fbs.ListBackupsReturn.Err
}

func (fbs *FakeBackupStorage) StartBackup(ctx context.Context, dir, name string) (backupstorage.BackupHandle, error) {
fbs.StartBackupCalls = append(fbs.StartBackupCalls, FakeBackupStorageStartBackupCall{ctx, dir, name})
return fbs.StartBackupReturn.BackupHandle, fbs.StartBackupReturn.Err
}

func (fbs *FakeBackupStorage) RemoveBackup(ctx context.Context, dir, name string) error {
fbs.RemoveBackupCalls = append(fbs.RemoveBackupCalls, FakeBackupStorageRemoveBackupCall{ctx, dir, name})
return fbs.RemoveBackupReturn
}

func (fbs *FakeBackupStorage) Close() error {
fbs.CloseCalls = fbs.CloseCalls + 1
return fbs.CloseReturn
}
22 changes: 18 additions & 4 deletions go/vt/mysqlctl/fakemysqldaemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,12 @@ type FakeMysqlDaemon struct {
// SemiSyncReplicaEnabled represents the state of rpl_semi_sync_slave_enabled.
SemiSyncReplicaEnabled bool

// TimeoutHook is a func that can be called at the beginning of any method to fake a timeout.
// all a test needs to do is make it { return context.DeadlineExceeded }
// GlobalReadLock is used to test if a lock has been acquired already or not
GlobalReadLock bool

// TimeoutHook is a func that can be called at the beginning of
// any method to fake a timeout.
// All a test needs to do is make it { return context.DeadlineExceeded }.
TimeoutHook func() error

// Version is the version that will be returned by GetVersionString.
Expand Down Expand Up @@ -684,10 +688,20 @@ func (fmd *FakeMysqlDaemon) GetVersionComment(ctx context.Context) string {

// AcquireGlobalReadLock is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) AcquireGlobalReadLock(ctx context.Context) error {
return errors.New("not implemented")
if fmd.GlobalReadLock {
return errors.New("lock already acquired")
}

fmd.GlobalReadLock = true
return nil
}

// ReleaseGlobalReadLock is part of the MysqlDaemon interface.
func (fmd *FakeMysqlDaemon) ReleaseGlobalReadLock(ctx context.Context) error {
return errors.New("not implemented")
if fmd.GlobalReadLock {
fmd.GlobalReadLock = false
return nil
}

return errors.New("no read locks acquired yet")
}
16 changes: 14 additions & 2 deletions go/vt/mysqlctl/mysqlshellbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ var (
// disable redo logging and double write buffer
mysqlShellSpeedUpRestore = false

mysqlShellBackupBinaryName = "mysqlsh"

// use when checking if we need to create the directory on the local filesystem or not.
knownObjectStoreParams = []string{"s3BucketName", "osBucketName", "azureContainerName"}

Expand Down Expand Up @@ -104,8 +106,8 @@ type MySQLShellBackupEngine struct {
}

const (
mysqlShellBackupBinaryName = "mysqlsh"
mysqlShellBackupEngineName = "mysqlshell"
mysqlShellLockMessage = "Global read lock has been released"
)

func (be *MySQLShellBackupEngine) ExecuteBackup(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle) (result bool, finalErr error) {
Expand Down Expand Up @@ -139,6 +141,11 @@ func (be *MySQLShellBackupEngine) ExecuteBackup(ctx context.Context, params Back
}
lockAcquired := time.Now() // we will report how long we hold the lock for

// we need to release the global read lock in case the backup fails to start and
// the lock wasn't released by releaseReadLock() yet. context might be expired,
// so we pass a new one.
defer func() { _ = params.Mysqld.ReleaseGlobalReadLock(context.Background()) }()

posBeforeBackup, err := params.Mysqld.PrimaryPosition()
if err != nil {
return false, vterrors.Wrap(err, "failed to fetch position")
Expand Down Expand Up @@ -171,6 +178,7 @@ func (be *MySQLShellBackupEngine) ExecuteBackup(ctx context.Context, params Back

// Get exit status.
if err := cmd.Wait(); err != nil {
pipeWriter.Close() // make sure we close the writer so the goroutines above will complete.
return false, vterrors.Wrap(err, mysqlShellBackupEngineName+" failed")
}

Expand Down Expand Up @@ -471,7 +479,7 @@ func releaseReadLock(ctx context.Context, reader io.Reader, params BackupParams,

if !released {

if !strings.Contains(line, "Global read lock has been released") {
if !strings.Contains(line, mysqlShellLockMessage) {
continue
}
released = true
Expand All @@ -489,6 +497,10 @@ func releaseReadLock(ctx context.Context, reader io.Reader, params BackupParams,
if err := scanner.Err(); err != nil {
params.Logger.Errorf("error reading from reader: %v", err)
}

if !released {
params.Logger.Errorf("could not release global lock earlier")
}
}

func cleanupMySQL(ctx context.Context, params RestoreParams, shouldDeleteUsers bool) error {
Expand Down
Loading

0 comments on commit c06b527

Please sign in to comment.