Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(v2): DR volume #183

Merged
merged 4 commits into from
Aug 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 33 additions & 17 deletions deltablock.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,9 @@ func RestoreDeltaBlockBackup(ctx context.Context, config *DeltaRestoreConfig) er
}
defer func() {
if err != nil {
_ = deltaOps.CloseVolumeDev(volDev)
if _err := deltaOps.CloseVolumeDev(volDev); _err != nil {
logrus.WithError(_err).Warnf("Failed to close volume device %v", volDevName)
}
}
}()

Expand Down Expand Up @@ -775,7 +777,10 @@ func RestoreDeltaBlockBackup(ctx context.Context, config *DeltaRestoreConfig) er
currentProgress := 0

defer func() {
_ = deltaOps.CloseVolumeDev(volDev)
if _err := deltaOps.CloseVolumeDev(volDev); _err != nil {
logrus.WithError(_err).Warnf("Failed to close volume device %v", volDevName)
}

deltaOps.UpdateRestoreStatus(volDevName, currentProgress, err)
if unlockErr := lock.Unlock(); unlockErr != nil {
logrus.WithError(unlockErr).Warn("Failed to unlock")
Expand Down Expand Up @@ -843,8 +848,9 @@ func RestoreDeltaBlockBackupIncrementally(ctx context.Context, config *DeltaRest
lastBackupName := config.LastBackupName
deltaOps := config.DeltaOps
if deltaOps == nil {
return fmt.Errorf("missing DeltaBlockBackupOperations")
return fmt.Errorf("missing DeltaRestoreOperations")
}

bsDriver, err := GetBackupStoreDriver(backupURL)
if err != nil {
return err
Expand Down Expand Up @@ -894,14 +900,16 @@ func RestoreDeltaBlockBackupIncrementally(ctx context.Context, config *DeltaRest
}
}

volDev, err := os.Create(volDevName)
volDev, volDevPath, err := deltaOps.OpenVolumeDev(volDevName)
if err != nil {
return err
return errors.Wrapf(err, "failed to open volume device %v", volDevName)
}
defer func() {
// make sure to close the device
if err != nil {
_ = volDev.Close()
if _err := deltaOps.CloseVolumeDev(volDev); _err != nil {
logrus.WithError(_err).Warnf("Failed to close volume device %v", volDevName)
}
}
}()

Expand Down Expand Up @@ -933,10 +941,18 @@ func RestoreDeltaBlockBackupIncrementally(ctx context.Context, config *DeltaRest
return err
}
go func() {
defer volDev.Close()
var err error
finalProgress := 0

defer func() {
if _err := deltaOps.CloseVolumeDev(volDev); _err != nil {
logrus.WithError(_err).Warnf("Failed to close volume device %v", volDevName)
}

deltaOps.UpdateRestoreStatus(volDevName, finalProgress, err)

if unlockErr := lock.Unlock(); unlockErr != nil {
logrus.WithError(err).Warn("Failed to unlock")
logrus.WithError(unlockErr).Warn("Failed to unlock")
}
}()

Expand All @@ -945,20 +961,20 @@ func RestoreDeltaBlockBackupIncrementally(ctx context.Context, config *DeltaRest
// closed.
// https://github.com/longhorn/longhorn/issues/2503
// We want to truncate regular files, but not device
if stat.Mode()&os.ModeType == 0 {
log.Debugf("Truncate %v to size %v", volDevName, vol.Size)
if err := volDev.Truncate(vol.Size); err != nil {
deltaOps.UpdateRestoreStatus(volDevName, 0, err)
if stat.Mode().IsRegular() {
log.Infof("Truncate %v to size %v", volDevName, vol.Size)
err = volDev.Truncate(vol.Size)
if err != nil {
return
}
}

if err := performIncrementalRestore(ctx, bsDriver, config, srcVolumeName, volDevName, lastBackup, backup); err != nil {
deltaOps.UpdateRestoreStatus(volDevName, 0, err)
err = performIncrementalRestore(ctx, bsDriver, config, srcVolumeName, volDevPath, lastBackup, backup)
if err != nil {
return
}

deltaOps.UpdateRestoreStatus(volDevName, PROGRESS_PERCENTAGE_BACKUP_TOTAL, nil)
finalProgress = PROGRESS_PERCENTAGE_BACKUP_TOTAL
}()
return nil
}
Expand Down Expand Up @@ -1107,7 +1123,7 @@ func restoreBlocks(ctx context.Context, bsDriver BackupStoreDriver, deltaOps Del
}

func performIncrementalRestore(ctx context.Context, bsDriver BackupStoreDriver, config *DeltaRestoreConfig,
srcVolumeName, volDevName string, lastBackup *Backup, backup *Backup) error {
srcVolumeName, volDevPath string, lastBackup *Backup, backup *Backup) error {
var err error
concurrentLimit := config.ConcurrentLimit

Expand All @@ -1119,7 +1135,7 @@ func performIncrementalRestore(ctx context.Context, bsDriver BackupStoreDriver,

errorChans := []<-chan error{errChan}
for i := 0; i < int(concurrentLimit); i++ {
errorChans = append(errorChans, restoreBlocks(ctx, bsDriver, config.DeltaOps, config.Filename, srcVolumeName, blockChan, progress))
errorChans = append(errorChans, restoreBlocks(ctx, bsDriver, config.DeltaOps, volDevPath, srcVolumeName, blockChan, progress))
}

mergedErrChan := mergeErrorChannels(ctx, errorChans...)
Expand Down