Skip to content

Commit

Permalink
feat(restore): implement backup restore finish
Browse files Browse the repository at this point in the history
longhorn/longhorn-6613

Signed-off-by: Chin-Ya Huang <chin-ya.huang@suse.com>
  • Loading branch information
c3y1huang committed Aug 16, 2024
1 parent 42e9ff1 commit a45546e
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 10 deletions.
64 changes: 64 additions & 0 deletions pkg/spdk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1759,6 +1759,14 @@ func (e *Engine) BackupRestore(spdkClient *spdkclient.Client, backupUrl, engineN
}
}

defer func() {
go func() {
if err := e.completeBackupRestore(spdkClient); err != nil {
logrus.WithError(err).Warn("Failed to complete backup restore")
}
}()
}()

resp := &spdkrpc.EngineBackupRestoreResponse{
Errors: map[string]string{},
}
Expand Down Expand Up @@ -1796,6 +1804,62 @@ func (e *Engine) BackupRestore(spdkClient *spdkclient.Client, backupUrl, engineN
return resp, nil
}

func (e *Engine) completeBackupRestore(spdkClient *spdkclient.Client) error {
if err := e.waitForRestoreComplete(); err != nil {
return errors.Wrapf(err, "failed to wait for restore complete")
}

return e.BackupRestoreFinish(spdkClient)
}

func (e *Engine) waitForRestoreComplete() error {
periodicChecker := time.NewTicker(time.Duration(restorePeriodicRefreshInterval.Seconds()) * time.Second)
defer periodicChecker.Stop()

var err error
for range periodicChecker.C {
isReplicaRestoreCompleted := true
for replicaName, replicaAddress := range e.ReplicaAddressMap {
isReplicaRestoreCompleted, err = e.isReplicaRestoreCompleted(replicaName, replicaAddress)
if err != nil {
return errors.Wrapf(err, "failed to check replica %s restore status", replicaName)
}

if !isReplicaRestoreCompleted {
break
}
}

if isReplicaRestoreCompleted {
e.log.Info("Backup restoration completed successfully")
return nil
}
}

return errors.Errorf("failed to wait for engine %s restore complete", e.Name)
}

func (e *Engine) isReplicaRestoreCompleted(replicaName, replicaAddress string) (bool, error) {
log := e.log.WithFields(logrus.Fields{
"replica": replicaName,
"address": replicaAddress,
})
log.Trace("Checking replica restore status")

replicaServiceCli, err := GetServiceClient(replicaAddress)
if err != nil {
return false, errors.Wrapf(err, "failed to get replica %v service client %s", replicaName, replicaAddress)
}
defer replicaServiceCli.Close()

status, err := replicaServiceCli.ReplicaRestoreStatus(replicaName)
if err != nil {
return false, errors.Wrapf(err, "failed to check replica %s restore status", replicaName)
}

return !status.IsRestoring, nil
}

func (e *Engine) BackupRestoreFinish(spdkClient *spdkclient.Client) error {
e.Lock()
defer e.Unlock()
Expand Down
31 changes: 21 additions & 10 deletions pkg/spdk/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1952,7 +1952,17 @@ func (r *Replica) BackupRestore(spdkClient *spdkclient.Client, backupUrl, snapsh
}
}()

if newRestore.LastRestored == "" {
isFullRestore := newRestore.LastRestored == ""

defer func() {
go func() {
if err := r.completeBackupRestore(spdkClient, isFullRestore); err != nil {
logrus.WithError(err).Warn("Failed to complete backup restore")
}
}()
}()

if isFullRestore {
r.log.Infof("Starting a new full restore for backup %v", backupUrl)
if err := r.backupRestore(backupUrl, newRestore.LvolName, concurrentLimit); err != nil {
return errors.Wrapf(err, "failed to start full backup restore")
Expand All @@ -1966,12 +1976,6 @@ func (r *Replica) BackupRestore(spdkClient *spdkclient.Client, backupUrl, snapsh
r.log.Infof("Successfully initiated incremental restore for %v to %v", backupUrl, newRestore.LvolName)
}

go func() {
if err := r.completeBackupRestore(spdkClient); err != nil {
logrus.WithError(err).Warn("Failed to complete backup restore")
}
}()

return nil

}
Expand Down Expand Up @@ -2024,7 +2028,7 @@ func (r *Replica) canDoIncrementalRestore(restore *Restore, backupURL, requested
return true
}

func (r *Replica) completeBackupRestore(spdkClient *spdkclient.Client) (err error) {
func (r *Replica) completeBackupRestore(spdkClient *spdkclient.Client, isFullRestore bool) (err error) {
defer func() {
if extraErr := r.finishRestore(err); extraErr != nil {
r.log.WithError(extraErr).Error("Failed to finish backup restore")
Expand All @@ -2039,9 +2043,11 @@ func (r *Replica) completeBackupRestore(spdkClient *spdkclient.Client) (err erro
restore := r.restore.DeepCopy()
r.RUnlock()

// TODO: Support postIncrementalRestoreOperations
if isFullRestore {
return r.postFullRestoreOperations(spdkClient, restore)
}

return r.postFullRestoreOperations(spdkClient, restore)
return r.postIncrementalRestoreOperations(restore)
}

func (r *Replica) waitForRestoreComplete() error {
Expand Down Expand Up @@ -2072,6 +2078,11 @@ func (r *Replica) waitForRestoreComplete() error {
return nil
}

func (r *Replica) postIncrementalRestoreOperations(restore *Restore) error {
r.log.Infof("Done running incremental restore %v to lvol %v", restore.BackupURL, restore.LvolName)
return nil
}

func (r *Replica) postFullRestoreOperations(spdkClient *spdkclient.Client, restore *Restore) error {
if r.restore.State == btypes.ProgressStateCanceled {
r.log.Info("Doing nothing for canceled backup restoration")
Expand Down

0 comments on commit a45546e

Please sign in to comment.