Skip to content

Commit

Permalink
feat(restore): implememt incremental restore
Browse files Browse the repository at this point in the history
longhorn/longhorn-6613

Signed-off-by: Chin-Ya Huang <chin-ya.huang@suse.com>
  • Loading branch information
c3y1huang committed Jul 31, 2024
1 parent 4a76980 commit 717505d
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 4 deletions.
5 changes: 4 additions & 1 deletion pkg/spdk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1626,7 +1626,10 @@ func (e *Engine) BackupRestore(spdkClient *spdkclient.Client, backupUrl, engineN
e.log.Infof("Generating a snapshot name %s for the full restore", snapshotName)
}
} else {
return nil, errors.Errorf("incremental restore is not supported yet")
if snapshotName == "" {
snapshotName = util.UUID()
e.log.Infof("Generating a snapshot name %s for the incremental restore", snapshotName)
}
}

resp := &spdkrpc.EngineBackupRestoreResponse{
Expand Down
29 changes: 28 additions & 1 deletion pkg/spdk/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1973,14 +1973,18 @@ func (r *Replica) BackupRestore(spdkClient *spdkclient.Client, backupUrl, snapsh
return grpcstatus.Errorf(grpccodes.Internal, err.Error())
}
} else {
r.log.Info("Resetting the restore for backup %v", backupUrl)

var lvolName string
var snapshotNameToBeRestored string

validLastRestoredBackup := r.canDoIncrementalRestore(restore, backupUrl, backupName)
if validLastRestoredBackup {
r.log.Infof("Starting an incremental restore for backup %v", backupUrl)
lvolName = GetReplicaSnapshotLvolName(r.Name, restore.LastRestored)
snapshotNameToBeRestored = restore.LastRestored
} else {
r.log.Infof("Starting a full restore for backup %v", backupUrl)
lvolName = GetReplicaSnapshotLvolName(r.Name, snapshotName)
snapshotNameToBeRestored = snapshotName
}
Expand All @@ -2003,7 +2007,11 @@ func (r *Replica) BackupRestore(spdkClient *spdkclient.Client, backupUrl, snapsh
}
r.log.Infof("Successfully initiated full restore for %v to %v", backupUrl, newRestore.LvolName)
} else {
return fmt.Errorf("incremental restore is not supported yet")
r.log.Infof("Starting an incremental restore for backup %v", backupUrl)
if err := r.backupRestoreIncrementally(backupUrl, newRestore.LastRestored, newRestore.LvolName, concurrentLimit); err != nil {
return errors.Wrapf(err, "failed to start incremental backup restore")
}
r.log.Infof("Successfully initiated incremental restore for %v to %v", backupUrl, newRestore.LvolName)
}

go func() {
Expand All @@ -2016,6 +2024,25 @@ func (r *Replica) BackupRestore(spdkClient *spdkclient.Client, backupUrl, snapsh

}

func (r *Replica) backupRestoreIncrementally(backupURL, lastRestored, snapshotLvolName string, concurrentLimit int32) error {
backupURL = butil.UnescapeURL(backupURL)

logrus.WithFields(logrus.Fields{
"backupURL": backupURL,
"lastRestored": lastRestored,
"snapshotLvolName": snapshotLvolName,
"concurrentLimit": concurrentLimit,
}).Info("Start restoring backup incrementally")

return backupstore.RestoreDeltaBlockBackupIncrementally(r.ctx, &backupstore.DeltaRestoreConfig{
BackupURL: backupURL,
DeltaOps: r.restore,
LastBackupName: lastRestored,
Filename: snapshotLvolName,
ConcurrentLimit: int32(concurrentLimit),
})
}

func (r *Replica) backupRestore(backupURL, snapshotLvolName string, concurrentLimit int32) error {
backupURL = butil.UnescapeURL(backupURL)

Expand Down
4 changes: 2 additions & 2 deletions pkg/spdk/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (r *Restore) DeepCopy() *Restore {
}
}

func (r *Restore) OpenVolumeDev(volDevName string) (*os.File, string, error) {
func (r *Restore) OpenVolumeDev(volDevName string, dmDeviceAndEndpointCleanupRequired bool) (*os.File, string, error) {
lvolName := r.replica.Name

r.log.Info("Unexposing lvol bdev before restoration")
Expand Down Expand Up @@ -141,7 +141,7 @@ func (r *Restore) OpenVolumeDev(volDevName string) (*os.File, string, error) {
if err != nil {
return nil, "", errors.Wrapf(err, "failed to create NVMe initiator for lvol bdev %v", lvolName)
}
if _, err := initiator.Start(r.ip, strconv.Itoa(int(r.port)), false); err != nil {
if _, err := initiator.Start(r.ip, strconv.Itoa(int(r.port)), dmDeviceAndEndpointCleanupRequired); err != nil {
return nil, "", errors.Wrapf(err, "failed to start NVMe initiator for lvol bdev %v", lvolName)
}
r.initiator = initiator
Expand Down

0 comments on commit 717505d

Please sign in to comment.