Skip to content

Commit

Permalink
feat(restore): implement backup restore finish
Browse files Browse the repository at this point in the history
longhorn/longhorn-6613

Signed-off-by: Chin-Ya Huang <chin-ya.huang@suse.com>
  • Loading branch information
c3y1huang committed Aug 9, 2024
1 parent 0197bd0 commit 9a41656
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 10 deletions.
55 changes: 55 additions & 0 deletions pkg/spdk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1651,6 +1651,14 @@ func (e *Engine) BackupRestore(spdkClient *spdkclient.Client, backupUrl, engineN
}
}

defer func() {
go func() {
if err := e.completeBackupRestore(spdkClient); err != nil {
logrus.WithError(err).Warn("Failed to complete backup restore")
}
}()
}()

resp := &spdkrpc.EngineBackupRestoreResponse{
Errors: map[string]string{},
}
Expand Down Expand Up @@ -1680,6 +1688,53 @@ func (e *Engine) BackupRestore(spdkClient *spdkclient.Client, backupUrl, engineN
return resp, nil
}

func (e *Engine) completeBackupRestore(spdkClient *spdkclient.Client) error {
if err := e.waitForRestoreComplete(); err != nil {
return errors.Wrapf(err, "failed to wait for restore complete")
}

return e.BackupRestoreFinish(spdkClient)
}

func (e *Engine) waitForRestoreComplete() error {
periodicChecker := time.NewTicker(time.Duration(restorePeriodicRefreshInterval.Seconds()) * time.Second)
defer periodicChecker.Stop()

for range periodicChecker.C {
isRestoring := false
for replicaName, replicaAddress := range e.ReplicaAddressMap {
log := e.log.WithFields(logrus.Fields{
"replica": replicaName,
"address": replicaAddress,
})
log.Trace("Checking replica restore status")

if isRestoring {
break
}

replicaServiceCli, err := GetServiceClient(replicaAddress)
if err != nil {
return errors.Wrapf(err, "failed to get replica %v service client %s", replicaName, replicaAddress)
}

status, err := replicaServiceCli.ReplicaRestoreStatus(replicaName)
if err != nil {
return errors.Wrapf(err, "failed to check replica %s restore status", replicaName)
}

isRestoring = status.IsRestoring
}

if !isRestoring {
e.log.Info("Backup restoration completed successfully")
return nil
}
}

return errors.Errorf("failed to wait for engine %s restore complete", e.Name)
}

func (e *Engine) BackupRestoreFinish(spdkClient *spdkclient.Client) error {
e.Lock()
defer e.Unlock()
Expand Down
31 changes: 21 additions & 10 deletions pkg/spdk/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1931,7 +1931,17 @@ func (r *Replica) BackupRestore(spdkClient *spdkclient.Client, backupUrl, snapsh
}
}()

if newRestore.LastRestored == "" {
isFullRestore := newRestore.LastRestored == ""

defer func() {
go func() {
if err := r.completeBackupRestore(spdkClient, isFullRestore); err != nil {
logrus.WithError(err).Warn("Failed to complete backup restore")
}
}()
}()

if isFullRestore {
r.log.Infof("Starting a new full restore for backup %v", backupUrl)
if err := r.backupRestore(backupUrl, newRestore.LvolName, concurrentLimit); err != nil {
return errors.Wrapf(err, "failed to start full backup restore")
Expand All @@ -1945,12 +1955,6 @@ func (r *Replica) BackupRestore(spdkClient *spdkclient.Client, backupUrl, snapsh
r.log.Infof("Successfully initiated incremental restore for %v to %v", backupUrl, newRestore.LvolName)
}

go func() {
if err := r.completeBackupRestore(spdkClient); err != nil {
logrus.WithError(err).Warn("Failed to complete backup restore")
}
}()

return nil

}
Expand Down Expand Up @@ -2003,7 +2007,7 @@ func (r *Replica) canDoIncrementalRestore(restore *Restore, backupURL, requested
return true
}

func (r *Replica) completeBackupRestore(spdkClient *spdkclient.Client) (err error) {
func (r *Replica) completeBackupRestore(spdkClient *spdkclient.Client, isFullRestore bool) (err error) {
defer func() {
if extraErr := r.finishRestore(err); extraErr != nil {
r.log.WithError(extraErr).Error("Failed to finish backup restore")
Expand All @@ -2018,9 +2022,11 @@ func (r *Replica) completeBackupRestore(spdkClient *spdkclient.Client) (err erro
restore := r.restore.DeepCopy()
r.RUnlock()

// TODO: Support postIncrementalRestoreOperations
if isFullRestore {
return r.postFullRestoreOperations(spdkClient, restore)
}

return r.postFullRestoreOperations(spdkClient, restore)
return r.postIncrementalRestoreOperations(restore)
}

func (r *Replica) waitForRestoreComplete() error {
Expand Down Expand Up @@ -2051,6 +2057,11 @@ func (r *Replica) waitForRestoreComplete() error {
return nil
}

func (r *Replica) postIncrementalRestoreOperations(restore *Restore) error {
r.log.Infof("Done running incremental restore %v to lvol %v", restore.BackupURL, restore.LvolName)
return nil
}

func (r *Replica) postFullRestoreOperations(spdkClient *spdkclient.Client, restore *Restore) error {
if r.restore.State == btypes.ProgressStateCanceled {
r.log.Info("Doing nothing for canceled backup restoration")
Expand Down

0 comments on commit 9a41656

Please sign in to comment.