Skip to content

Commit

Permalink
Fix potential race conditions and refactor the unit tests
Browse files Browse the repository at this point in the history
1. In the manager, backing image check and registration should be
done within one locking.

2. Considering the old backing image implementation, if the backing
image deletion happens before the actual downloading
pulling/receiving starts, the download cannot be cancelled.

3. State `starting` of BackingImage now means the backing image
init the downloading but the actual downloading is not started.
State `pending` means the backing image is just registered but
hasn't init the downloading. Then we can rely on the state to
refuse duplicate downloading calls.

4. In the downloader, we need to make sure the check & update
for the downloading related fields are atomic.

5. If the backing image state is empty but somehow the downloader
is not right, it's BackingImage's responsibility to cancel the
downloading. If the downloader finds that something is wrong with
the download context, it's the downloader's responsibility to
cancel the downloading and reset itself.

Signed-off-by: Shuo Wu <shuo.wu@suse.com>
  • Loading branch information
shuo-wu authored and innobead committed Apr 20, 2021
1 parent 6604bf0 commit 904cd77
Show file tree
Hide file tree
Showing 5 changed files with 300 additions and 155 deletions.
136 changes: 87 additions & 49 deletions pkg/server/backing_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type state string

const (
StatePending = state(types.DownloadStatePending)
StateStarting = state(types.DownloadStateStarting)
StateDownloading = state(types.DownloadStateDownloading)
StateDownloaded = state(types.DownloadStateDownloaded)
StateFailed = state(types.DownloadStateFailed)
Expand Down Expand Up @@ -50,10 +51,10 @@ type BackingImage struct {
log logrus.FieldLogger
updateCh chan interface{}

downloader Downloader
downloader *BackingImageDownloader
}

func NewBackingImage(name, url, uuid, diskPathOnHost, diskPathInContainer string, downloader Downloader, updateCh chan interface{}) *BackingImage {
func NewBackingImage(name, url, uuid, diskPathOnHost, diskPathInContainer string, downloader *BackingImageDownloader, updateCh chan interface{}) *BackingImage {
hostDir := filepath.Join(diskPathOnHost, types.BackingImageManagerDirectoryName, GetBackingImageDirectoryName(name, uuid))
workDir := filepath.Join(diskPathInContainer, types.BackingImageManagerDirectoryName, GetBackingImageDirectoryName(name, uuid))
return &BackingImage{
Expand All @@ -62,7 +63,7 @@ func NewBackingImage(name, url, uuid, diskPathOnHost, diskPathInContainer string
URL: url,
HostDirectory: hostDir,
WorkDirectory: workDir,
state: StatePending,
state: types.DownloadStatePending,
log: logrus.StandardLogger().WithFields(
logrus.Fields{
"component": "backing-image",
Expand All @@ -85,22 +86,37 @@ func GetBackingImageDirectoryName(biName, biUUID string) string {

func (bi *BackingImage) Pull() (resp *rpc.BackingImageResponse, err error) {
bi.lock.Lock()
log := bi.log
log.Info("Backing Image: start to pull backing image")

if bi.state != types.DownloadStatePending {
state := bi.state
bi.lock.Unlock()
return nil, fmt.Errorf("invalid state %v for pulling", state)
}

defer func() {
if err != nil {
bi.state = StateFailed
bi.errorMsg = err.Error()
bi.log.WithError(err).Error("Backing Image: failed to pull backing image")
log.WithError(err).Error("Backing Image: failed to pull backing image")
bi.downloader.Cancel()
}
bi.lock.Unlock()
bi.updateCh <- nil
}()
bi.log.Info("Backing Image: start to pull backing image")

// This means state was pending but somehow the downloader had been initialized.
if err := bi.downloader.InitDownloading(); err != nil {
return nil, errors.Wrapf(err, "failed to ask for the downloader init before pulling")
}
bi.state = types.DownloadStateStarting

if err = bi.checkAndReuseBackingImageFileWithoutLock(); err == nil {
bi.log.Infof("Backing Image: succeeded to reuse the existing backing image file, will skip pulling")
log.Infof("Backing Image: succeeded to reuse the existing backing image file, will skip pulling")
return bi.rpcResponse(), nil
}
bi.log.Infof("Backing Image: failed to try to check or reuse the possible existing backing image file, will start pulling then: %v", err)
log.Infof("Backing Image: failed to try to check or reuse the possible existing backing image file, will start pulling then: %v", err)

if err := bi.prepareForDownload(); err != nil {
return nil, errors.Wrapf(err, "failed to prepare for pulling")
Expand All @@ -111,7 +127,7 @@ func (bi *BackingImage) Pull() (resp *rpc.BackingImageResponse, err error) {
return nil, errors.Wrapf(err, "failed to get file size before pulling")
}
if size <= 0 {
bi.log.Warnf("Backing Image: cannot get size from URL, will set size after pulling")
log.Warnf("Backing Image: cannot get size from URL, will set size after pulling")
}
bi.size = size

Expand All @@ -124,7 +140,8 @@ func (bi *BackingImage) Pull() (resp *rpc.BackingImageResponse, err error) {
bi.lock.Lock()
bi.state = StateFailed
bi.errorMsg = err.Error()
bi.log.WithError(err).Error("Backing Image: failed to pull from remote")
log.WithError(err).Error("Backing Image: failed to pull from remote")
bi.downloader.Cancel()
bi.lock.Unlock()
return
}
Expand All @@ -133,7 +150,7 @@ func (bi *BackingImage) Pull() (resp *rpc.BackingImageResponse, err error) {
}()
go bi.waitForDownloadStartWithLock()

bi.log.Info("Backing Image: pulling backing image")
log.Info("Backing Image: pulling backing image")

return bi.rpcResponse(), nil
}
Expand Down Expand Up @@ -198,24 +215,37 @@ func (bi *BackingImage) Get() (resp *rpc.BackingImageResponse) {

func (bi *BackingImage) Receive(size int64, senderManagerAddress string, portAllocateFunc func(portCount int32) (int32, int32, error), portReleaseFunc func(start, end int32) error) (port int32, err error) {
bi.lock.Lock()
log := bi.log
log.Info("Backing Image: start to receive backing image")

if bi.state != types.DownloadStatePending {
state := bi.state
bi.lock.Unlock()
return 0, fmt.Errorf("invalid state %v for receiving", state)
}

defer func() {
if err != nil {
bi.state = StateFailed
bi.errorMsg = err.Error()
bi.log.WithError(err).Error("Backing Image: failed to receive backing image")
log.WithError(err).Error("Backing Image: failed to receive backing image")
bi.downloader.Cancel()
}
bi.lock.Unlock()
bi.updateCh <- nil
}()

bi.senderManagerAddress = senderManagerAddress
bi.log = bi.log.WithField("senderManagerAddress", senderManagerAddress)
// This means state was pending but somehow the downloader had been initialized.
if err := bi.downloader.InitDownloading(); err != nil {
return 0, errors.Wrapf(err, "failed to ask for the downloader init before receiving")
}
bi.state = types.DownloadStateStarting

if err = bi.checkAndReuseBackingImageFileWithoutLock(); err == nil {
bi.log.Infof("Backing Image: succeeded to reuse the existing backing image file, will skip syncing")
log.Infof("Backing Image: succeeded to reuse the existing backing image file, will skip syncing")
return 0, nil
}
bi.log.Infof("Backing Image: failed to try to check or reuse the possible existing backing image file, will start syncing then: %v", err)
log.Infof("Backing Image: failed to try to check or reuse the possible existing backing image file, will start syncing then: %v", err)

if err := bi.prepareForDownload(); err != nil {
return 0, errors.Wrapf(err, "failed to prepare for backing image receiving")
Expand All @@ -226,22 +256,26 @@ func (bi *BackingImage) Receive(size int64, senderManagerAddress string, portAll
}

bi.size = size
bi.senderManagerAddress = senderManagerAddress
log = log.WithField("senderManagerAddress", senderManagerAddress)
bi.log = log

go func() {
defer func() {
bi.updateCh <- nil
if err := portReleaseFunc(port, port+1); err != nil {
bi.log.WithError(err).Errorf("Failed to release port %v after receiving backing image", port)
log.WithError(err).Errorf("Failed to release port %v after receiving backing image", port)
}
}()

bi.log.Infof("Backing Image: prepare to receive backing image at port %v", port)
log.Infof("Backing Image: prepare to receive backing image at port %v", port)

if err := bi.downloader.Receive(strconv.Itoa(int(port)), filepath.Join(bi.WorkDirectory, types.BackingImageTmpFileName), bi); err != nil {
bi.lock.Lock()
bi.state = StateFailed
bi.errorMsg = err.Error()
bi.log.WithError(err).Errorf("Backing Image: failed to receive backing image from %v", senderManagerAddress)
log.WithError(err).Errorf("Backing Image: failed to receive backing image from %v", senderManagerAddress)
bi.downloader.Cancel()
bi.lock.Unlock()
return
}
Expand All @@ -255,6 +289,7 @@ func (bi *BackingImage) Receive(size int64, senderManagerAddress string, portAll

func (bi *BackingImage) Send(address string, portAllocateFunc func(portCount int32) (int32, int32, error), portReleaseFunc func(start, end int32) error) (err error) {
bi.lock.Lock()
log := bi.log
oldState := bi.state
defer func() {
currentState := bi.state
Expand All @@ -270,7 +305,7 @@ func (bi *BackingImage) Send(address string, portAllocateFunc func(portCount int
if err := bi.validateFiles(); err != nil {
bi.state = StateFailed
bi.errorMsg = err.Error()
bi.log.WithError(err).Error("Backing Image: failed to validate files before sending")
log.WithError(err).Error("Backing Image: failed to validate files before sending")
return errors.Wrapf(err, "cannot send backing image %v to others since the files are invalid", bi.Name)
}
if bi.sendingReference >= types.SendingLimit {
Expand All @@ -285,22 +320,22 @@ func (bi *BackingImage) Send(address string, portAllocateFunc func(portCount int
bi.sendingReference++

go func() {
bi.log.Infof("Backing Image: start to send backing image to address %v", address)
log.Infof("Backing Image: start to send backing image to address %v", address)
defer func() {
bi.lock.Lock()
bi.sendingReference--
bi.lock.Unlock()
bi.updateCh <- nil
if err := portReleaseFunc(port, port+1); err != nil {
bi.log.WithError(err).Errorf("Failed to release port %v after sending backing image", port)
log.WithError(err).Errorf("Failed to release port %v after sending backing image", port)
}
}()

if err := bi.downloader.Send(filepath.Join(bi.WorkDirectory, types.BackingImageFileName), address); err != nil {
bi.log.WithError(err).Errorf("Backing Image: failed to send backing image to address %v", address)
log.WithError(err).Errorf("Backing Image: failed to send backing image to address %v", address)
return
}
bi.log.Infof("Backing Image: done sending backing image to address %v", address)
log.Infof("Backing Image: done sending backing image to address %v", address)
}()

return nil
Expand Down Expand Up @@ -345,6 +380,7 @@ func (bi *BackingImage) checkAndReuseBackingImageFileWithoutLock() error {
bi.processedSize = cfg.Size
bi.progress = 100
bi.state = types.DownloadStateDownloaded
bi.downloader.Cancel()
bi.log.Infof("Backing Image: Directly reuse/introduce the existing file in path %v", backingImagePath)

return nil
Expand Down Expand Up @@ -380,6 +416,7 @@ func (bi *BackingImage) validateFiles() error {
// Don't need to check anything for a failed/pending backing image.
// Let's directly wait for cleanup then re-downloading.
case StatePending:
case StateStarting:
case StateFailed:
default:
return fmt.Errorf("unexpected state for file validation")
Expand All @@ -396,24 +433,21 @@ func (bi *BackingImage) waitForDownloadStartWithLock() {
select {
case <-ticker.C:
count++
bi.lock.RLock()
state := bi.state
bi.lock.RUnlock()
if state != types.DownloadStatePending {
bi.lock.Lock()
if bi.state != types.DownloadStateStarting {
bi.lock.Unlock()
return
}
if count >= RetryCount {
if state == types.DownloadStatePending {
bi.lock.Lock()
bi.state = types.DownloadStateFailed
bi.errorMsg = fmt.Sprintf("failed to wait for download start in %v seconds", RetryCount)
bi.log.Errorf("Backing Image: %v", bi.errorMsg)
bi.lock.Unlock()
bi.downloader.Cancel()
bi.updateCh <- nil
}
bi.state = types.DownloadStateFailed
bi.errorMsg = fmt.Sprintf("failed to wait for download start in %v seconds", RetryCount)
bi.log.Errorf("Backing Image: %v", bi.errorMsg)
bi.lock.Unlock()
bi.downloader.Cancel()
bi.updateCh <- nil
return
}
bi.lock.Unlock()
}
}
}
Expand All @@ -423,33 +457,37 @@ func (bi *BackingImage) completeDownloadWithLock() {
backingImagePath := filepath.Join(bi.WorkDirectory, types.BackingImageFileName)

bi.lock.Lock()
defer bi.lock.Unlock()

if bi.state == StateFailed {
bi.log.Warnf("Backing Image: state somehow becomes %v after downloading, will not continue renaming file", types.DownloadStateFailed)
return
}
log := bi.log

var err error
defer func() {
if err != nil {
bi.state = StateFailed
bi.errorMsg = err.Error()
bi.log.WithError(err).Error("Backing Image: failed to complete download")
if bi.state != StateFailed {
bi.state = StateFailed
bi.errorMsg = err.Error()
log.WithError(err).Error("Backing Image: failed to complete download")
bi.downloader.Cancel()
}
}
bi.lock.Unlock()
}()

if bi.state != StateDownloading {
log.Warnf("Backing Image: invalid state %v after downloading", bi.state)
return
}

tmpFileStat, err := os.Stat(backingImageTmpPath)
if err != nil {
err = errors.Wrapf(err, "failed to check the tmp file after downloading")
return
}
if tmpFileStat.Size() != bi.size {
bi.log.Debugf("Backing Image: update image size %v to the actual file size %v after downloading", bi.size, tmpFileStat.Size())
log.Debugf("Backing Image: update image size %v to the actual file size %v after downloading", bi.size, tmpFileStat.Size())
bi.size = tmpFileStat.Size()
}
if tmpFileStat.Size() != bi.processedSize {
bi.log.Debugf("Backing Image: processed size %v is not equal to the actual file size %v after downloading", bi.processedSize, tmpFileStat.Size())
log.Debugf("Backing Image: processed size %v is not equal to the actual file size %v after downloading", bi.processedSize, tmpFileStat.Size())
bi.processedSize = tmpFileStat.Size()
}

Expand All @@ -469,15 +507,15 @@ func (bi *BackingImage) completeDownloadWithLock() {

bi.progress = 100
bi.state = StateDownloaded
bi.log.Infof("Backing Image: downloaded backing image file")
log.Infof("Backing Image: downloaded backing image file")
return
}

func (bi *BackingImage) UpdateSyncFileProgress(size int64) {
bi.lock.Lock()
defer bi.lock.Unlock()

if bi.state == types.DownloadStatePending {
if bi.state == types.DownloadStateStarting {
bi.state = types.DownloadStateDownloading
}

Expand Down
Loading

0 comments on commit 904cd77

Please sign in to comment.