From 717505dc7b93691100e3bac9516166a93cf56850 Mon Sep 17 00:00:00 2001 From: Chin-Ya Huang Date: Mon, 29 Jul 2024 15:42:02 +0800 Subject: [PATCH] feat(restore): implememt incremental restore longhorn/longhorn-6613 Signed-off-by: Chin-Ya Huang --- pkg/spdk/engine.go | 5 ++++- pkg/spdk/replica.go | 29 ++++++++++++++++++++++++++++- pkg/spdk/restore.go | 4 ++-- 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/pkg/spdk/engine.go b/pkg/spdk/engine.go index e32f47ef..73d72027 100644 --- a/pkg/spdk/engine.go +++ b/pkg/spdk/engine.go @@ -1626,7 +1626,10 @@ func (e *Engine) BackupRestore(spdkClient *spdkclient.Client, backupUrl, engineN e.log.Infof("Generating a snapshot name %s for the full restore", snapshotName) } } else { - return nil, errors.Errorf("incremental restore is not supported yet") + if snapshotName == "" { + snapshotName = util.UUID() + e.log.Infof("Generating a snapshot name %s for the incremental restore", snapshotName) + } } resp := &spdkrpc.EngineBackupRestoreResponse{ diff --git a/pkg/spdk/replica.go b/pkg/spdk/replica.go index 7bb243c9..77b5d492 100644 --- a/pkg/spdk/replica.go +++ b/pkg/spdk/replica.go @@ -1973,14 +1973,18 @@ func (r *Replica) BackupRestore(spdkClient *spdkclient.Client, backupUrl, snapsh return grpcstatus.Errorf(grpccodes.Internal, err.Error()) } } else { + r.log.Info("Resetting the restore for backup %v", backupUrl) + var lvolName string var snapshotNameToBeRestored string validLastRestoredBackup := r.canDoIncrementalRestore(restore, backupUrl, backupName) if validLastRestoredBackup { + r.log.Infof("Starting an incremental restore for backup %v", backupUrl) lvolName = GetReplicaSnapshotLvolName(r.Name, restore.LastRestored) snapshotNameToBeRestored = restore.LastRestored } else { + r.log.Infof("Starting a full restore for backup %v", backupUrl) lvolName = GetReplicaSnapshotLvolName(r.Name, snapshotName) snapshotNameToBeRestored = snapshotName } @@ -2003,7 +2007,11 @@ func (r *Replica) BackupRestore(spdkClient *spdkclient.Client, backupUrl, snapsh } r.log.Infof("Successfully initiated full restore for %v to %v", backupUrl, newRestore.LvolName) } else { - return fmt.Errorf("incremental restore is not supported yet") + r.log.Infof("Starting an incremental restore for backup %v", backupUrl) + if err := r.backupRestoreIncrementally(backupUrl, newRestore.LastRestored, newRestore.LvolName, concurrentLimit); err != nil { + return errors.Wrapf(err, "failed to start incremental backup restore") + } + r.log.Infof("Successfully initiated incremental restore for %v to %v", backupUrl, newRestore.LvolName) } go func() { @@ -2016,6 +2024,25 @@ func (r *Replica) BackupRestore(spdkClient *spdkclient.Client, backupUrl, snapsh } +func (r *Replica) backupRestoreIncrementally(backupURL, lastRestored, snapshotLvolName string, concurrentLimit int32) error { + backupURL = butil.UnescapeURL(backupURL) + + logrus.WithFields(logrus.Fields{ + "backupURL": backupURL, + "lastRestored": lastRestored, + "snapshotLvolName": snapshotLvolName, + "concurrentLimit": concurrentLimit, + }).Info("Start restoring backup incrementally") + + return backupstore.RestoreDeltaBlockBackupIncrementally(r.ctx, &backupstore.DeltaRestoreConfig{ + BackupURL: backupURL, + DeltaOps: r.restore, + LastBackupName: lastRestored, + Filename: snapshotLvolName, + ConcurrentLimit: int32(concurrentLimit), + }) +} + func (r *Replica) backupRestore(backupURL, snapshotLvolName string, concurrentLimit int32) error { backupURL = butil.UnescapeURL(backupURL) diff --git a/pkg/spdk/restore.go b/pkg/spdk/restore.go index 74c8b08e..bf902085 100644 --- a/pkg/spdk/restore.go +++ b/pkg/spdk/restore.go @@ -113,7 +113,7 @@ func (r *Restore) DeepCopy() *Restore { } } -func (r *Restore) OpenVolumeDev(volDevName string) (*os.File, string, error) { +func (r *Restore) OpenVolumeDev(volDevName string, dmDeviceAndEndpointCleanupRequired bool) (*os.File, string, error) { lvolName := r.replica.Name r.log.Info("Unexposing lvol bdev before restoration") @@ -141,7 +141,7 @@ func (r *Restore) OpenVolumeDev(volDevName string) (*os.File, string, error) { if err != nil { return nil, "", errors.Wrapf(err, "failed to create NVMe initiator for lvol bdev %v", lvolName) } - if _, err := initiator.Start(r.ip, strconv.Itoa(int(r.port)), false); err != nil { + if _, err := initiator.Start(r.ip, strconv.Itoa(int(r.port)), dmDeviceAndEndpointCleanupRequired); err != nil { return nil, "", errors.Wrapf(err, "failed to start NVMe initiator for lvol bdev %v", lvolName) } r.initiator = initiator