From 0d60baee717580e3570b921d87a6791887bbae8d Mon Sep 17 00:00:00 2001 From: Chin-Ya Huang Date: Mon, 5 Aug 2024 10:28:26 +0800 Subject: [PATCH] feat(restore): implement backup restore finish longhorn/longhorn-6613 Signed-off-by: Chin-Ya Huang --- pkg/spdk/engine.go | 64 +++++++++++++++++++++++++++++++++++++++++++++ pkg/spdk/replica.go | 31 +++++++++++++++------- 2 files changed, 85 insertions(+), 10 deletions(-) diff --git a/pkg/spdk/engine.go b/pkg/spdk/engine.go index 4421fbac..2834454d 100644 --- a/pkg/spdk/engine.go +++ b/pkg/spdk/engine.go @@ -1759,6 +1759,14 @@ func (e *Engine) BackupRestore(spdkClient *spdkclient.Client, backupUrl, engineN } } + defer func() { + go func() { + if err := e.completeBackupRestore(spdkClient); err != nil { + logrus.WithError(err).Warn("Failed to complete backup restore") + } + }() + }() + resp := &spdkrpc.EngineBackupRestoreResponse{ Errors: map[string]string{}, } @@ -1796,6 +1804,62 @@ func (e *Engine) BackupRestore(spdkClient *spdkclient.Client, backupUrl, engineN return resp, nil } +func (e *Engine) completeBackupRestore(spdkClient *spdkclient.Client) error { + if err := e.waitForRestoreComplete(); err != nil { + return errors.Wrapf(err, "failed to wait for restore complete") + } + + return e.BackupRestoreFinish(spdkClient) +} + +func (e *Engine) waitForRestoreComplete() error { + periodicChecker := time.NewTicker(time.Duration(restorePeriodicRefreshInterval.Seconds()) * time.Second) + defer periodicChecker.Stop() + + var err error + for range periodicChecker.C { + isReplicaRestoreCompleted := true + for replicaName, replicaAddress := range e.ReplicaAddressMap { + isReplicaRestoreCompleted, err = e.isReplicaRestoreCompleted(replicaName, replicaAddress) + if err != nil { + return errors.Wrapf(err, "failed to check replica %s restore status", replicaName) + } + + if !isReplicaRestoreCompleted { + break + } + } + + if isReplicaRestoreCompleted { + e.log.Info("Backup restoration completed successfully") + return nil + } + } + + return errors.Errorf("failed to wait for engine %s restore complete", e.Name) +} + +func (e *Engine) isReplicaRestoreCompleted(replicaName, replicaAddress string) (bool, error) { + log := e.log.WithFields(logrus.Fields{ + "replica": replicaName, + "address": replicaAddress, + }) + log.Trace("Checking replica restore status") + + replicaServiceCli, err := GetServiceClient(replicaAddress) + if err != nil { + return false, errors.Wrapf(err, "failed to get replica %v service client %s", replicaName, replicaAddress) + } + defer replicaServiceCli.Close() + + status, err := replicaServiceCli.ReplicaRestoreStatus(replicaName) + if err != nil { + return false, errors.Wrapf(err, "failed to check replica %s restore status", replicaName) + } + + return !status.IsRestoring, nil +} + func (e *Engine) BackupRestoreFinish(spdkClient *spdkclient.Client) error { e.Lock() defer e.Unlock() diff --git a/pkg/spdk/replica.go b/pkg/spdk/replica.go index bb6e2d02..5c7a9e99 100644 --- a/pkg/spdk/replica.go +++ b/pkg/spdk/replica.go @@ -1952,7 +1952,17 @@ func (r *Replica) BackupRestore(spdkClient *spdkclient.Client, backupUrl, snapsh } }() - if newRestore.LastRestored == "" { + isFullRestore := newRestore.LastRestored == "" + + defer func() { + go func() { + if err := r.completeBackupRestore(spdkClient, isFullRestore); err != nil { + logrus.WithError(err).Warn("Failed to complete backup restore") + } + }() + }() + + if isFullRestore { r.log.Infof("Starting a new full restore for backup %v", backupUrl) if err := r.backupRestore(backupUrl, newRestore.LvolName, concurrentLimit); err != nil { return errors.Wrapf(err, "failed to start full backup restore") @@ -1966,12 +1976,6 @@ func (r *Replica) BackupRestore(spdkClient *spdkclient.Client, backupUrl, snapsh r.log.Infof("Successfully initiated incremental restore for %v to %v", backupUrl, newRestore.LvolName) } - go func() { - if err := r.completeBackupRestore(spdkClient); err != nil { - logrus.WithError(err).Warn("Failed to complete backup restore") - } - }() - return nil } @@ -2024,7 +2028,7 @@ func (r *Replica) canDoIncrementalRestore(restore *Restore, backupURL, requested return true } -func (r *Replica) completeBackupRestore(spdkClient *spdkclient.Client) (err error) { +func (r *Replica) completeBackupRestore(spdkClient *spdkclient.Client, isFullRestore bool) (err error) { defer func() { if extraErr := r.finishRestore(err); extraErr != nil { r.log.WithError(extraErr).Error("Failed to finish backup restore") @@ -2039,9 +2043,11 @@ func (r *Replica) completeBackupRestore(spdkClient *spdkclient.Client) (err erro restore := r.restore.DeepCopy() r.RUnlock() - // TODO: Support postIncrementalRestoreOperations + if isFullRestore { + return r.postFullRestoreOperations(spdkClient, restore) + } - return r.postFullRestoreOperations(spdkClient, restore) + return r.postIncrementalRestoreOperations(restore) } func (r *Replica) waitForRestoreComplete() error { @@ -2072,6 +2078,11 @@ func (r *Replica) waitForRestoreComplete() error { return nil } +func (r *Replica) postIncrementalRestoreOperations(restore *Restore) error { + r.log.Infof("Done running incremental restore %v to lvol %v", restore.BackupURL, restore.LvolName) + return nil +} + func (r *Replica) postFullRestoreOperations(spdkClient *spdkclient.Client, restore *Restore) error { if r.restore.State == btypes.ProgressStateCanceled { r.log.Info("Doing nothing for canceled backup restoration")