Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(v2): DR volume #192

Merged
merged 7 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ toolchain go1.22.6
require (
github.com/0xPolygon/polygon-edge v1.3.3
github.com/google/uuid v1.6.0
github.com/longhorn/backupstore v0.0.0-20240823072635-7afd6aa10d3e
github.com/longhorn/backupstore v0.0.0-20240827054225-fe89e488b75f
github.com/longhorn/go-common-libs v0.0.0-20240821134112-907f57efd48f
github.com/longhorn/go-spdk-helper v0.0.0-20240820144231-33c0873802ff
github.com/longhorn/types v0.0.0-20240725040629-473d671316c4
github.com/longhorn/types v0.0.0-20240827042720-af8f10eb57cd
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.9.3
go.uber.org/multierr v1.11.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/longhorn/backupstore v0.0.0-20240823072635-7afd6aa10d3e h1:Kvt/BqiHKaOlTPSM2HFz0VupuQM0laln67rMIDCRYxQ=
github.com/longhorn/backupstore v0.0.0-20240823072635-7afd6aa10d3e/go.mod h1:N4cqNhSs4VUw9aGbO2OfyiIvJL7/L53hUrNiT73UN+U=
github.com/longhorn/backupstore v0.0.0-20240827054225-fe89e488b75f h1:/Wo/leT2yrMmiDieCGhzqyzXb9FNsWoGeYWNfuf29KA=
github.com/longhorn/backupstore v0.0.0-20240827054225-fe89e488b75f/go.mod h1:N4cqNhSs4VUw9aGbO2OfyiIvJL7/L53hUrNiT73UN+U=
github.com/longhorn/go-common-libs v0.0.0-20240821134112-907f57efd48f h1:hjqUs3WVodkzrWwlUMVsnKAlom3uohoNlhZBGLsRvQY=
github.com/longhorn/go-common-libs v0.0.0-20240821134112-907f57efd48f/go.mod h1:Qv34svr/msf6XoUwnrltNBTwMhQljbHEhb5ZKWiRdxo=
github.com/longhorn/go-spdk-helper v0.0.0-20240820144231-33c0873802ff h1:8vR29tkbmzmdqRVtOo5kL7Rs7nfhA6duXsmetIh1Tbg=
github.com/longhorn/go-spdk-helper v0.0.0-20240820144231-33c0873802ff/go.mod h1:Bzz7kGNYikAJqpmeV3cgN8jP1y9M+/oaiBc5iolIxuA=
github.com/longhorn/nsfilelock v0.0.0-20200723175406-fa7c83ad0003 h1:Jw9uANsGcHTxp6HcC++/vN17LfeuDmozHI2j6DoZf5E=
github.com/longhorn/nsfilelock v0.0.0-20200723175406-fa7c83ad0003/go.mod h1:0CLeXlf59Lg6C0kjLSDf47ft73Dh37CwymYRKWwAn04=
github.com/longhorn/types v0.0.0-20240725040629-473d671316c4 h1:L2g0sIJ2fXt4BSFRYNnF6ObtKryCUFm9qLcCXHWssCk=
github.com/longhorn/types v0.0.0-20240725040629-473d671316c4/go.mod h1:KlJuZB8NfHchWshYxYgV9pPIxBKC04Vq05G2TfgMf7w=
github.com/longhorn/types v0.0.0-20240827042720-af8f10eb57cd h1:AwVxaFaxLPmyl++SyigaZZw8u+Ggun7HlcmNgNqyhjs=
github.com/longhorn/types v0.0.0-20240827042720-af8f10eb57cd/go.mod h1:KlJuZB8NfHchWshYxYgV9pPIxBKC04Vq05G2TfgMf7w=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
Expand Down
12 changes: 0 additions & 12 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,18 +860,6 @@ func (c *SPDKClient) ReplicaBackupRestore(req *BackupRestoreRequest) error {
return err
}

func (c *SPDKClient) EngineBackupRestoreFinish(engineName string) error {
client := c.getSPDKServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), GRPCServiceTimeout)
defer cancel()

_, err := client.EngineBackupRestoreFinish(ctx, &spdkrpc.EngineBackupRestoreFinishRequest{
EngineName: engineName,
})

return err
}

func (c *SPDKClient) EngineRestoreStatus(engineName string) (*spdkrpc.RestoreStatusResponse, error) {
client := c.getSPDKServiceClient()
ctx, cancel := context.WithTimeout(context.Background(), GRPCServiceTimeout)
Expand Down
93 changes: 83 additions & 10 deletions pkg/spdk/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ type Engine struct {
Head *api.Lvol
SnapshotMap map[string]*api.Lvol

IsRestoring bool
IsRestoring bool
RestoringSnapshotName string
shuo-wu marked this conversation as resolved.
Show resolved Hide resolved

// UpdateCh should not be protected by the engine lock
UpdateCh chan interface{}
Expand Down Expand Up @@ -1747,16 +1748,28 @@ func (e *Engine) BackupRestore(spdkClient *spdkclient.Client, backupUrl, engineN

e.IsRestoring = true

// TODO: support DR volume
if len(e.SnapshotMap) == 0 {
if snapshotName == "" {
snapshotName = util.UUID()
e.log.Infof("Generating a snapshot name %s for the full restore", snapshotName)
}
} else {
return nil, errors.Errorf("incremental restore is not supported yet")
switch {
case snapshotName != "":
e.RestoringSnapshotName = snapshotName
e.log.Infof("Using input snapshot name %s for the restore", e.RestoringSnapshotName)
case len(e.SnapshotMap) == 0:
e.RestoringSnapshotName = util.UUID()
e.log.Infof("Using new generated snapshot name %s for the full restore", e.RestoringSnapshotName)
case e.RestoringSnapshotName != "":
e.log.Infof("Using existing snapshot name %s for the incremental restore", e.RestoringSnapshotName)
default:
e.RestoringSnapshotName = util.UUID()
e.log.Infof("Using new generated snapshot name %s for the incremental restore because e.FinalSnapshotName is empty", e.RestoringSnapshotName)
}

defer func() {
go func() {
if err := e.completeBackupRestore(spdkClient); err != nil {
logrus.WithError(err).Warn("Failed to complete backup restore")
}
}()
}()

resp := &spdkrpc.EngineBackupRestoreResponse{
Errors: map[string]string{},
}
Expand All @@ -1780,7 +1793,7 @@ func (e *Engine) BackupRestore(spdkClient *spdkclient.Client, backupUrl, engineN
err = replicaServiceCli.ReplicaBackupRestore(&client.BackupRestoreRequest{
BackupUrl: backupUrl,
ReplicaName: replicaName,
SnapshotName: snapshotName,
SnapshotName: e.RestoringSnapshotName,
Credential: credential,
ConcurrentLimit: concurrentLimit,
})
Expand All @@ -1794,6 +1807,66 @@ func (e *Engine) BackupRestore(spdkClient *spdkclient.Client, backupUrl, engineN
return resp, nil
}

func (e *Engine) completeBackupRestore(spdkClient *spdkclient.Client) error {
if err := e.waitForRestoreComplete(); err != nil {
return errors.Wrapf(err, "failed to wait for restore complete")
}

return e.BackupRestoreFinish(spdkClient)
shuo-wu marked this conversation as resolved.
Show resolved Hide resolved
}

func (e *Engine) waitForRestoreComplete() error {
periodicChecker := time.NewTicker(time.Duration(restorePeriodicRefreshInterval.Seconds()) * time.Second)
defer periodicChecker.Stop()

var err error
for range periodicChecker.C {
isReplicaRestoreCompleted := true
for replicaName, replicaAddress := range e.ReplicaAddressMap {
c3y1huang marked this conversation as resolved.
Show resolved Hide resolved
if e.ReplicaModeMap[replicaName] != types.ModeRW {
continue
}

isReplicaRestoreCompleted, err = e.isReplicaRestoreCompleted(replicaName, replicaAddress)
if err != nil {
return errors.Wrapf(err, "failed to check replica %s restore status", replicaName)
}

if !isReplicaRestoreCompleted {
break
}
}

if isReplicaRestoreCompleted {
e.log.Info("Backup restoration completed successfully")
return nil
}
}

return errors.Errorf("failed to wait for engine %s restore complete", e.Name)
}

func (e *Engine) isReplicaRestoreCompleted(replicaName, replicaAddress string) (bool, error) {
log := e.log.WithFields(logrus.Fields{
"replica": replicaName,
"address": replicaAddress,
})
log.Trace("Checking replica restore status")

replicaServiceCli, err := GetServiceClient(replicaAddress)
if err != nil {
return false, errors.Wrapf(err, "failed to get replica %v service client %s", replicaName, replicaAddress)
}
defer replicaServiceCli.Close()

status, err := replicaServiceCli.ReplicaRestoreStatus(replicaName)
if err != nil {
return false, errors.Wrapf(err, "failed to check replica %s restore status", replicaName)
}

return !status.IsRestoring, nil
}

func (e *Engine) BackupRestoreFinish(spdkClient *spdkclient.Client) error {
e.Lock()
defer e.Unlock()
Expand Down
96 changes: 80 additions & 16 deletions pkg/spdk/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1926,21 +1926,25 @@ func (r *Replica) BackupRestore(spdkClient *spdkclient.Client, backupUrl, snapsh
lvolName := GetReplicaSnapshotLvolName(r.Name, snapshotName)
r.restore, err = NewRestore(spdkClient, lvolName, snapshotName, backupUrl, backupName, r)
if err != nil {
err = errors.Wrapf(err, "failed to start new restore")
err = errors.Wrap(err, "failed to start new restore")
return grpcstatus.Errorf(grpccodes.Internal, err.Error())
}
} else {
r.log.Infof("Resetting the restore for backup %v", backupUrl)

var lvolName string
var snapshotNameToBeRestored string

validLastRestoredBackup := r.canDoIncrementalRestore(restore, backupUrl, backupName)
if validLastRestoredBackup {
lvolName = GetReplicaSnapshotLvolName(r.Name, restore.LastRestored)
snapshotNameToBeRestored = restore.LastRestored
r.log.Infof("Starting an incremental restore for backup %v", backupUrl)
} else {
lvolName = GetReplicaSnapshotLvolName(r.Name, snapshotName)
snapshotNameToBeRestored = snapshotName
r.log.Infof("Starting a full restore for backup %v", backupUrl)
}

lvolName = GetReplicaSnapshotLvolName(r.Name, snapshotName)
snapshotNameToBeRestored = snapshotName

r.restore.StartNewRestore(backupUrl, backupName, lvolName, snapshotNameToBeRestored, validLastRestoredBackup)
}

Expand All @@ -1953,26 +1957,53 @@ func (r *Replica) BackupRestore(spdkClient *spdkclient.Client, backupUrl, snapsh
}
}()

if newRestore.LastRestored == "" {
isFullRestore := newRestore.LastRestored == ""

defer func() {
go func() {
if err := r.completeBackupRestore(spdkClient, isFullRestore); err != nil {
logrus.WithError(err).Warn("Failed to complete backup restore")
}
}()
}()

if isFullRestore {
r.log.Infof("Starting a new full restore for backup %v", backupUrl)
if err := r.backupRestore(backupUrl, newRestore.LvolName, concurrentLimit); err != nil {
return errors.Wrapf(err, "failed to start full backup restore")
}
r.log.Infof("Successfully initiated full restore for %v to %v", backupUrl, newRestore.LvolName)
} else {
return fmt.Errorf("incremental restore is not supported yet")
}

go func() {
if err := r.completeBackupRestore(spdkClient); err != nil {
logrus.WithError(err).Warn("Failed to complete backup restore")
r.log.Infof("Starting an incremental restore for backup %v", backupUrl)
if err := r.backupRestoreIncrementally(backupUrl, newRestore.LastRestored, newRestore.LvolName, concurrentLimit); err != nil {
return errors.Wrapf(err, "failed to start incremental backup restore")
}
}()
r.log.Infof("Successfully initiated incremental restore for %v to %v", backupUrl, newRestore.LvolName)
}

return nil

}

func (r *Replica) backupRestoreIncrementally(backupURL, lastRestored, snapshotLvolName string, concurrentLimit int32) error {
backupURL = butil.UnescapeURL(backupURL)

logrus.WithFields(logrus.Fields{
"backupURL": backupURL,
"lastRestored": lastRestored,
"snapshotLvolName": snapshotLvolName,
"concurrentLimit": concurrentLimit,
}).Info("Start restoring backup incrementally")

return backupstore.RestoreDeltaBlockBackupIncrementally(r.ctx, &backupstore.DeltaRestoreConfig{
BackupURL: backupURL,
DeltaOps: r.restore,
LastBackupName: lastRestored,
Filename: snapshotLvolName,
ConcurrentLimit: int32(concurrentLimit),
})
}

func (r *Replica) backupRestore(backupURL, snapshotLvolName string, concurrentLimit int32) error {
backupURL = butil.UnescapeURL(backupURL)

Expand Down Expand Up @@ -2002,7 +2033,7 @@ func (r *Replica) canDoIncrementalRestore(restore *Restore, backupURL, requested
return true
}

func (r *Replica) completeBackupRestore(spdkClient *spdkclient.Client) (err error) {
func (r *Replica) completeBackupRestore(spdkClient *spdkclient.Client, isFullRestore bool) (err error) {
defer func() {
if extraErr := r.finishRestore(err); extraErr != nil {
r.log.WithError(extraErr).Error("Failed to finish backup restore")
Expand All @@ -2017,9 +2048,11 @@ func (r *Replica) completeBackupRestore(spdkClient *spdkclient.Client) (err erro
restore := r.restore.DeepCopy()
r.RUnlock()

// TODO: Support postIncrementalRestoreOperations
if isFullRestore {
return r.postFullRestoreOperations(spdkClient, restore)
}

return r.postFullRestoreOperations(spdkClient, restore)
return r.postIncrementalRestoreOperations(spdkClient, restore)
}

func (r *Replica) waitForRestoreComplete() error {
Expand Down Expand Up @@ -2050,6 +2083,37 @@ func (r *Replica) waitForRestoreComplete() error {
return nil
}

func (r *Replica) postIncrementalRestoreOperations(spdkClient *spdkclient.Client, restore *Restore) error {
r.log.Infof("Replacing snapshot %v of the restored volume", restore.SnapshotName)

if r.restore.State == btypes.ProgressStateCanceled {
r.log.Info("Doing nothing for canceled backup restoration")
return nil
}

// Delete snapshot; SPDK will coalesce the content into the current head lvol.
r.log.Infof("Deleting snapshot %v for snapshot replacement of the restored volume", restore.SnapshotName)
_, err := r.SnapshotDelete(spdkClient, restore.SnapshotName)
shuo-wu marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
r.log.WithError(err).Error("Failed to delete snapshot of the restored volume")
return errors.Wrapf(err, "failed to delete snapshot of the restored volume")
}

r.log.Infof("Creating snapshot %v for snapshot replacement of the restored volume", restore.SnapshotName)
opts := &api.SnapshotOptions{
UserCreated: false,
Timestamp: util.Now(),
}
_, err = r.SnapshotCreate(spdkClient, restore.SnapshotName, opts)
if err != nil {
r.log.WithError(err).Error("Failed to take snapshot of the restored volume")
return errors.Wrapf(err, "failed to take snapshot of the restored volume")
}

r.log.Infof("Done running incremental restore %v to lvol %v", restore.BackupURL, restore.LvolName)
return nil
}

func (r *Replica) postFullRestoreOperations(spdkClient *spdkclient.Client, restore *Restore) error {
if r.restore.State == btypes.ProgressStateCanceled {
r.log.Info("Doing nothing for canceled backup restoration")
Expand Down
2 changes: 1 addition & 1 deletion pkg/spdk/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (r *Restore) OpenVolumeDev(volDevName string) (*os.File, string, error) {
if err != nil {
return nil, "", errors.Wrapf(err, "failed to create NVMe initiator for lvol bdev %v", lvolName)
}
if _, err := initiator.Start(r.ip, strconv.Itoa(int(r.port)), false); err != nil {
if _, err := initiator.Start(r.ip, strconv.Itoa(int(r.port)), true); err != nil {
return nil, "", errors.Wrapf(err, "failed to start NVMe initiator for lvol bdev %v", lvolName)
}
r.initiator = initiator
Expand Down
21 changes: 0 additions & 21 deletions pkg/spdk/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1359,27 +1359,6 @@ func (s *Server) ReplicaBackupRestore(ctx context.Context, req *spdkrpc.ReplicaB
return &emptypb.Empty{}, nil
}

func (s *Server) EngineBackupRestoreFinish(ctx context.Context, req *spdkrpc.EngineBackupRestoreFinishRequest) (ret *emptypb.Empty, err error) {
logrus.WithFields(logrus.Fields{
"engine": req.EngineName,
}).Info("Finishing backup restoration")

s.RLock()
e := s.engineMap[req.EngineName]
s.RUnlock()

if e == nil {
return nil, grpcstatus.Errorf(grpccodes.NotFound, "cannot find engine %v for finishing backup restoration", req.EngineName)
}

err = e.BackupRestoreFinish(s.spdkClient)
if err != nil {
err = errors.Wrapf(err, "failed to finish backup restoration for engine %v", req.EngineName)
return nil, grpcstatus.Errorf(grpccodes.Internal, err.Error())
}
return &emptypb.Empty{}, nil
}

func (s *Server) EngineRestoreStatus(ctx context.Context, req *spdkrpc.RestoreStatusRequest) (*spdkrpc.RestoreStatusResponse, error) {
s.RLock()
e := s.engineMap[req.EngineName]
Expand Down
Loading