Skip to content

Commit

Permalink
feat(restore): implememt incremental restore
Browse files Browse the repository at this point in the history
longhorn/longhorn-6613

Signed-off-by: Chin-Ya Huang <chin-ya.huang@suse.com>
  • Loading branch information
c3y1huang committed Aug 23, 2024
1 parent 10f2cae commit ba2b956
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 2 deletions.
5 changes: 4 additions & 1 deletion pkg/spdk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1754,7 +1754,10 @@ func (e *Engine) BackupRestore(spdkClient *spdkclient.Client, backupUrl, engineN
e.log.Infof("Generating a snapshot name %s for the full restore", snapshotName)
}
} else {
return nil, errors.Errorf("incremental restore is not supported yet")
if snapshotName == "" {
snapshotName = util.UUID()
e.log.Infof("Generating a snapshot name %s for the incremental restore", snapshotName)
}
}

resp := &spdkrpc.EngineBackupRestoreResponse{
Expand Down
29 changes: 28 additions & 1 deletion pkg/spdk/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1930,14 +1930,18 @@ func (r *Replica) BackupRestore(spdkClient *spdkclient.Client, backupUrl, snapsh
return grpcstatus.Errorf(grpccodes.Internal, err.Error())
}
} else {
r.log.Info("Resetting the restore for backup %v", backupUrl)

var lvolName string
var snapshotNameToBeRestored string

validLastRestoredBackup := r.canDoIncrementalRestore(restore, backupUrl, backupName)
if validLastRestoredBackup {
r.log.Infof("Starting an incremental restore for backup %v", backupUrl)
lvolName = GetReplicaSnapshotLvolName(r.Name, restore.LastRestored)
snapshotNameToBeRestored = restore.LastRestored
} else {
r.log.Infof("Starting a full restore for backup %v", backupUrl)
lvolName = GetReplicaSnapshotLvolName(r.Name, snapshotName)
snapshotNameToBeRestored = snapshotName
}
Expand All @@ -1960,7 +1964,11 @@ func (r *Replica) BackupRestore(spdkClient *spdkclient.Client, backupUrl, snapsh
}
r.log.Infof("Successfully initiated full restore for %v to %v", backupUrl, newRestore.LvolName)
} else {
return fmt.Errorf("incremental restore is not supported yet")
r.log.Infof("Starting an incremental restore for backup %v", backupUrl)
if err := r.backupRestoreIncrementally(backupUrl, newRestore.LastRestored, newRestore.LvolName, concurrentLimit); err != nil {
return errors.Wrapf(err, "failed to start incremental backup restore")
}
r.log.Infof("Successfully initiated incremental restore for %v to %v", backupUrl, newRestore.LvolName)
}

go func() {
Expand All @@ -1973,6 +1981,25 @@ func (r *Replica) BackupRestore(spdkClient *spdkclient.Client, backupUrl, snapsh

}

func (r *Replica) backupRestoreIncrementally(backupURL, lastRestored, snapshotLvolName string, concurrentLimit int32) error {
backupURL = butil.UnescapeURL(backupURL)

logrus.WithFields(logrus.Fields{
"backupURL": backupURL,
"lastRestored": lastRestored,
"snapshotLvolName": snapshotLvolName,
"concurrentLimit": concurrentLimit,
}).Info("Start restoring backup incrementally")

return backupstore.RestoreDeltaBlockBackupIncrementally(r.ctx, &backupstore.DeltaRestoreConfig{
BackupURL: backupURL,
DeltaOps: r.restore,
LastBackupName: lastRestored,
Filename: snapshotLvolName,
ConcurrentLimit: int32(concurrentLimit),
})
}

func (r *Replica) backupRestore(backupURL, snapshotLvolName string, concurrentLimit int32) error {
backupURL = butil.UnescapeURL(backupURL)

Expand Down

0 comments on commit ba2b956

Please sign in to comment.