Skip to content

Commit

Permalink
feat(restore): create incremental restore snapshot
Browse files Browse the repository at this point in the history
longhorn/longhorn-6613

Signed-off-by: Chin-Ya Huang <chin-ya.huang@suse.com>
  • Loading branch information
c3y1huang committed Aug 27, 2024
1 parent 8970454 commit b78093f
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 20 deletions.
27 changes: 15 additions & 12 deletions pkg/spdk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ type Engine struct {
Head *api.Lvol
SnapshotMap map[string]*api.Lvol

IsRestoring bool
IsRestoring bool
RestoringSnapshotName string

// UpdateCh should not be protected by the engine lock
UpdateCh chan interface{}
Expand Down Expand Up @@ -1748,16 +1749,18 @@ func (e *Engine) BackupRestore(spdkClient *spdkclient.Client, backupUrl, engineN
e.IsRestoring = true

// TODO: support DR volume
if len(e.SnapshotMap) == 0 {
if snapshotName == "" {
snapshotName = util.UUID()
e.log.Infof("Generating a snapshot name %s for the full restore", snapshotName)
}
} else {
if snapshotName == "" {
snapshotName = util.UUID()
e.log.Infof("Generating a snapshot name %s for the incremental restore", snapshotName)
}
switch {
case snapshotName != "":
e.RestoringSnapshotName = snapshotName
e.log.Infof("Using input snapshot name %s for the restore", e.RestoringSnapshotName)
case len(e.SnapshotMap) == 0:
e.RestoringSnapshotName = util.UUID()
e.log.Infof("Using new generated snapshot name %s for the full restore", e.RestoringSnapshotName)
case e.RestoringSnapshotName != "":
e.log.Infof("Using existing snapshot name %s for the incremental restore", e.RestoringSnapshotName)
default:
e.RestoringSnapshotName = util.UUID()
e.log.Infof("Using new generated snapshot name %s for the incremental restore because e.FinalSnapshotName is empty", e.RestoringSnapshotName)
}

defer func() {
Expand Down Expand Up @@ -1791,7 +1794,7 @@ func (e *Engine) BackupRestore(spdkClient *spdkclient.Client, backupUrl, engineN
err = replicaServiceCli.ReplicaBackupRestore(&client.BackupRestoreRequest{
BackupUrl: backupUrl,
ReplicaName: replicaName,
SnapshotName: snapshotName,
SnapshotName: e.RestoringSnapshotName,
Credential: credential,
ConcurrentLimit: concurrentLimit,
})
Expand Down
42 changes: 34 additions & 8 deletions pkg/spdk/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1926,25 +1926,25 @@ func (r *Replica) BackupRestore(spdkClient *spdkclient.Client, backupUrl, snapsh
lvolName := GetReplicaSnapshotLvolName(r.Name, snapshotName)
r.restore, err = NewRestore(spdkClient, lvolName, snapshotName, backupUrl, backupName, r)
if err != nil {
err = errors.Wrapf(err, "failed to start new restore")
err = errors.Wrap(err, "failed to start new restore")
return grpcstatus.Errorf(grpccodes.Internal, err.Error())
}
} else {
r.log.Info("Resetting the restore for backup %v", backupUrl)
r.log.Infof("Resetting the restore for backup %v", backupUrl)

var lvolName string
var snapshotNameToBeRestored string

validLastRestoredBackup := r.canDoIncrementalRestore(restore, backupUrl, backupName)
if validLastRestoredBackup {
r.log.Infof("Starting an incremental restore for backup %v", backupUrl)
lvolName = GetReplicaSnapshotLvolName(r.Name, restore.LastRestored)
snapshotNameToBeRestored = restore.LastRestored
} else {
r.log.Infof("Starting a full restore for backup %v", backupUrl)
lvolName = GetReplicaSnapshotLvolName(r.Name, snapshotName)
snapshotNameToBeRestored = snapshotName
}

lvolName = GetReplicaSnapshotLvolName(r.Name, snapshotName)
snapshotNameToBeRestored = snapshotName

r.restore.StartNewRestore(backupUrl, backupName, lvolName, snapshotNameToBeRestored, validLastRestoredBackup)
}

Expand Down Expand Up @@ -2052,7 +2052,7 @@ func (r *Replica) completeBackupRestore(spdkClient *spdkclient.Client, isFullRes
return r.postFullRestoreOperations(spdkClient, restore)
}

return r.postIncrementalRestoreOperations(restore)
return r.postIncrementalRestoreOperations(spdkClient, restore)
}

func (r *Replica) waitForRestoreComplete() error {
Expand Down Expand Up @@ -2083,7 +2083,33 @@ func (r *Replica) waitForRestoreComplete() error {
return nil
}

func (r *Replica) postIncrementalRestoreOperations(restore *Restore) error {
func (r *Replica) postIncrementalRestoreOperations(spdkClient *spdkclient.Client, restore *Restore) error {
r.log.Infof("Replacing snapshot %v of the restored volume", restore.SnapshotName)

if r.restore.State == btypes.ProgressStateCanceled {
r.log.Info("Doing nothing for canceled backup restoration")
return nil
}

// Delete snapshot; SPDK will coalesce the content into the current head lvol.
r.log.Infof("Deleting snapshot %v for snapshot replacement of the restored volume", restore.SnapshotName)
_, err := r.SnapshotDelete(spdkClient, restore.SnapshotName)
if err != nil {
r.log.WithError(err).Error("Failed to delete snapshot of the restored volume")
return errors.Wrapf(err, "failed to delete snapshot of the restored volume")
}

r.log.Infof("Creating snapshot %v for snapshot replacement of the restored volume", restore.SnapshotName)
opts := &api.SnapshotOptions{
UserCreated: false,
Timestamp: util.Now(),
}
_, err = r.SnapshotCreate(spdkClient, restore.SnapshotName, opts)
if err != nil {
r.log.WithError(err).Error("Failed to take snapshot of the restored volume")
return errors.Wrapf(err, "failed to take snapshot of the restored volume")
}

r.log.Infof("Done running incremental restore %v to lvol %v", restore.BackupURL, restore.LvolName)
return nil
}
Expand Down

0 comments on commit b78093f

Please sign in to comment.