From dc1ec5a9faa82a77d88c0c82750b4dca79d2b612 Mon Sep 17 00:00:00 2001 From: Chin-Ya Huang Date: Thu, 19 Sep 2024 13:21:49 +0800 Subject: [PATCH 1/2] chore: update vendors longhorn/longhorn-8430 Signed-off-by: Chin-Ya Huang --- go.mod | 2 +- go.sum | 4 +- .../longhorn-spdk-engine/pkg/client/client.go | 3 +- .../longhorn-spdk-engine/pkg/spdk/engine.go | 494 ++++++++++-------- .../longhorn-spdk-engine/pkg/spdk/replica.go | 58 +- .../longhorn-spdk-engine/pkg/spdk/server.go | 2 +- vendor/modules.txt | 2 +- 7 files changed, 341 insertions(+), 224 deletions(-) diff --git a/go.mod b/go.mod index fc3904f9f..2ab84538a 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/longhorn/go-common-libs v0.0.0-20241102040927-43901cc2fa33 github.com/longhorn/go-spdk-helper v0.0.0-20241103044742-606c0ee8d532 github.com/longhorn/longhorn-engine v1.8.0-dev-20241103 - github.com/longhorn/longhorn-spdk-engine v0.0.0-20241023025831-ecc7b8a48d56 + github.com/longhorn/longhorn-spdk-engine v0.0.0-20241106035350-4642db182def github.com/longhorn/types v0.0.0-20241101010532-9e901229a935 github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.9.3 diff --git a/go.sum b/go.sum index 3794f073e..dbf99fd3c 100644 --- a/go.sum +++ b/go.sum @@ -115,8 +115,8 @@ github.com/longhorn/go-spdk-helper v0.0.0-20241103044742-606c0ee8d532 h1:hAPa1Uc github.com/longhorn/go-spdk-helper v0.0.0-20241103044742-606c0ee8d532/go.mod h1:hMcAO/kxvkAjW/nL7mYsNonJUQhFSh0rMMQ/A/PjTLI= github.com/longhorn/longhorn-engine v1.8.0-dev-20241103 h1:Q3RKmiD9SlxJ9bMRA+Rs0gbEFzQek2zkOjkbnuVpcPQ= github.com/longhorn/longhorn-engine v1.8.0-dev-20241103/go.mod h1:7c7M3uig+IpM252ewuQ3VZIRIWtwuLOOTBmvLm61QZw= -github.com/longhorn/longhorn-spdk-engine v0.0.0-20241023025831-ecc7b8a48d56 h1:R0mtag2TpUqkiPJAQQRj+wnjmr3xvMB3AVH7F39mhho= -github.com/longhorn/longhorn-spdk-engine v0.0.0-20241023025831-ecc7b8a48d56/go.mod h1:lkFlcoa5ZEYH1ufC+sHdG2jd6gIlhtg129L2Uczf6kU= +github.com/longhorn/longhorn-spdk-engine v0.0.0-20241106035350-4642db182def h1:1LmVa0lk1w/zKj9lqEJS5ZdRXGlBK6AIETYCwdbtlbk= +github.com/longhorn/longhorn-spdk-engine v0.0.0-20241106035350-4642db182def/go.mod h1:uGfvDuUPjW4FIfwiNEH1yJA1CjRoDa6g8ndTPJezGdg= github.com/longhorn/sparse-tools v0.0.0-20241023025917-7951cd783270 h1:F13lddDaeUX8dBwRqOT/aXtb2C1szwqIFgW4KpZgCGw= github.com/longhorn/sparse-tools v0.0.0-20241023025917-7951cd783270/go.mod h1:iUJCZtOKG/9xv2rfrUAYZntFTzP5dZtvy4Kwe6dMcUc= github.com/longhorn/types v0.0.0-20241101010532-9e901229a935 h1:s6ngry7kCUdggXRKywHdwt98vjbOZQX8Txq166hxph0= diff --git a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/client/client.go b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/client/client.go index 0c3f2bcc9..b67f20ea5 100644 --- a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/client/client.go +++ b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/client/client.go @@ -471,7 +471,7 @@ func (c *SPDKClient) ReplicaRebuildingDstSnapshotRevert(name, snapshotName strin } func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize uint64, replicaAddressMap map[string]string, portCount int32, - initiatorAddress, targetAddress string, upgradeRequired bool) (*api.Engine, error) { + initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (*api.Engine, error) { if name == "" || volumeName == "" || len(replicaAddressMap) == 0 { return nil, fmt.Errorf("failed to start SPDK engine: missing required parameters") } @@ -490,6 +490,7 @@ func (c *SPDKClient) EngineCreate(name, volumeName, frontend string, specSize ui UpgradeRequired: upgradeRequired, TargetAddress: targetAddress, InitiatorAddress: initiatorAddress, + SalvageRequested: salvageRequested, }) if err != nil { return nil, errors.Wrap(err, "failed to start SPDK engine") diff --git a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/engine.go b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/engine.go index 8146a1a25..6c79c7fff 100644 --- a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/engine.go +++ b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/engine.go @@ -47,10 +47,7 @@ type Engine struct { Nqn string Nguid string - // TODO: Use a single map to store all replica info - ReplicaAddressMap map[string]string - ReplicaBdevNameMap map[string]string - ReplicaModeMap map[string]types.Mode + ReplicaStatusMap map[string]*EngineReplicaStatus initiator *nvme.Initiator dmDeviceBusy bool @@ -70,6 +67,12 @@ type Engine struct { log logrus.FieldLogger } +type EngineReplicaStatus struct { + Address string + BdevName string + Mode types.Mode +} + func NewEngine(engineName, volumeName, frontend string, specSize uint64, engineUpdateCh chan interface{}) *Engine { log := logrus.StandardLogger().WithFields(logrus.Fields{ "engineName": engineName, @@ -84,13 +87,12 @@ func NewEngine(engineName, volumeName, frontend string, specSize uint64, engineU log.WithField("specSize", roundedSpecSize) return &Engine{ - Name: engineName, - VolumeName: volumeName, - Frontend: frontend, - SpecSize: specSize, - ReplicaAddressMap: map[string]string{}, - ReplicaBdevNameMap: map[string]string{}, - ReplicaModeMap: map[string]types.Mode{}, + Name: engineName, + VolumeName: volumeName, + Frontend: frontend, + SpecSize: specSize, + + ReplicaStatusMap: map[string]*EngineReplicaStatus{}, State: types.InstanceStatePending, @@ -102,13 +104,14 @@ func NewEngine(engineName, volumeName, frontend string, specSize uint64, engineU } } -func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, upgradeRequired bool) (ret *spdkrpc.Engine, err error) { +func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[string]string, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorAddress, targetAddress string, upgradeRequired, salvageRequested bool) (ret *spdkrpc.Engine, err error) { logrus.WithFields(logrus.Fields{ "portCount": portCount, "upgradeRequired": upgradeRequired, "replicaAddressMap": replicaAddressMap, "initiatorAddress": initiatorAddress, "targetAddress": targetAddress, + "salvageRequested": salvageRequested, }).Info("Creating engine") requireUpdate := true @@ -190,25 +193,36 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str initiatorCreationRequired = false } + if salvageRequested { + e.log.Info("Requesting salvage for engine replicas") + + replicaAddressMap, err = e.filterSalvageCandidates(replicaAddressMap) + if err != nil { + return nil, errors.Wrapf(err, "failed to update replica mode to filter salvage candidates") + } + } + for replicaName, replicaAddr := range replicaAddressMap { + e.ReplicaStatusMap[replicaName] = &EngineReplicaStatus{ + Address: replicaAddr, + } + bdevName, err := connectNVMfBdev(spdkClient, replicaName, replicaAddr) if err != nil { - e.log.WithError(err).Warnf("Failed to get bdev from replica %s with address %s during creation, will mark the mode from %v to ERR and continue", replicaName, replicaAddr, e.ReplicaModeMap[replicaName]) - e.ReplicaModeMap[replicaName] = types.ModeERR - e.ReplicaBdevNameMap[replicaName] = "" + e.log.WithError(err).Warnf("Failed to get bdev from replica %s with address %s during creation, will mark the mode to ERR and continue", replicaName, replicaAddr) + e.ReplicaStatusMap[replicaName].Mode = types.ModeERR } else { // TODO: Check if a replica is really a RW replica rather than a rebuilding failed replica - e.ReplicaModeMap[replicaName] = types.ModeRW - e.ReplicaBdevNameMap[replicaName] = bdevName + e.ReplicaStatusMap[replicaName].Mode = types.ModeRW + e.ReplicaStatusMap[replicaName].BdevName = bdevName + replicaBdevList = append(replicaBdevList, bdevName) } - replicaBdevList = append(replicaBdevList, bdevName) } - e.ReplicaAddressMap = replicaAddressMap - e.log = e.log.WithField("replicaAddressMap", replicaAddressMap) + e.log = e.log.WithField("replicaStatusMap", e.ReplicaStatusMap) e.checkAndUpdateInfoFromReplicaNoLock() - e.log.Infof("Connected all available replicas %+v, then launching raid during engine creation", e.ReplicaModeMap) + e.log.Infof("Tried to connected all replicas %+v, then launching raid during engine creation", e.ReplicaStatusMap) if _, err := spdkClient.BdevRaidCreate(e.Name, spdktypes.BdevRaidLevel1, 0, replicaBdevList); err != nil { return nil, err } @@ -241,22 +255,20 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str } for replicaName, replicaAddr := range replicaAddressMap { - _, ok := engineWithTarget.ReplicaAddressMap[replicaName] - if !ok { - e.log.WithError(err).Warnf("Failed to get bdev from replica %s with address %s, will mark the mode from %v to ERR and continue", replicaName, replicaAddr, e.ReplicaModeMap[replicaName]) - e.ReplicaModeMap[replicaName] = types.ModeERR - e.ReplicaBdevNameMap[replicaName] = "" + e.ReplicaStatusMap[replicaName] = &EngineReplicaStatus{ + Address: replicaAddr, + } + if _, ok := engineWithTarget.ReplicaAddressMap[replicaName]; !ok { + e.log.WithError(err).Warnf("Failed to get bdev from replica %s with address %s during creation, will mark the mode to ERR and continue", replicaName, replicaAddr) + e.ReplicaStatusMap[replicaName].Mode = types.ModeERR } else { - e.ReplicaModeMap[replicaName] = types.ModeRW - e.ReplicaBdevNameMap[replicaName] = replicaName + // TODO: Check if a replica is really a RW replica rather than a rebuilding failed replica + e.ReplicaStatusMap[replicaName].Mode = types.ModeRW + e.ReplicaStatusMap[replicaName].BdevName = replicaName } - - replicaBdevList = append(replicaBdevList, replicaName) } - - e.ReplicaAddressMap = replicaAddressMap - e.log = e.log.WithField("replicaAddressMap", replicaAddressMap) - e.log.Infof("Re-connected all available replicas %+v for engine reconstruction during upgrade", e.ReplicaModeMap) + e.log = e.log.WithField("replicaStatusMap", e.ReplicaStatusMap) + e.log.Infof("Tried to re-connected all replicas %+v for engine reconstruction during upgrade", e.ReplicaStatusMap) } e.log.Info("Launching frontend during engine creation") @@ -271,6 +283,80 @@ func (e *Engine) Create(spdkClient *spdkclient.Client, replicaAddressMap map[str return e.getWithoutLock(), nil } +// filterSalvageCandidates updates the replicaAddressMap by retaining only replicas +// eligible for salvage based on the largest volume head size. +// +// It iterates through all replicas and: +// - Retrieves the volume head size for each replica. +// - Identifies replicas with the largest volume head size as salvage candidates. +// - Remove the replicas that are not eligible as salvage candidates. +func (e *Engine) filterSalvageCandidates(replicaAddressMap map[string]string) (map[string]string, error) { + // Initialize filteredCandidates to hold a copy of replicaAddressMap. + filteredCandidates := map[string]string{} + for key, value := range replicaAddressMap { + filteredCandidates[key] = value + } + + volumeHeadSizeToReplicaNames := map[uint64][]string{} + + // Collect volume head size for each replica. + for replicaName, replicaAddress := range replicaAddressMap { + func() { + // Get service client for the current replica. + replicaServiceCli, err := GetServiceClient(replicaAddress) + if err != nil { + e.log.WithError(err).Warnf("Skipping salvage for replica %s with address %s due to failed to get replica service client", replicaName, replicaAddress) + return + } + + defer func() { + if errClose := replicaServiceCli.Close(); errClose != nil { + e.log.WithError(errClose).Errorf("Failed to close replica %s client with address %s during salvage candidate filtering", replicaName, replicaAddress) + } + }() + + // Retrieve replica information. + replica, err := replicaServiceCli.ReplicaGet(replicaName) + if err != nil { + e.log.WithError(err).Warnf("Skipping salvage for replica %s with address %s due to failed to get replica info", replicaName, replicaAddress) + delete(filteredCandidates, replicaName) + return + } + + // Map volume head size to replica names. + volumeHeadSizeToReplicaNames[replica.Head.ActualSize] = append(volumeHeadSizeToReplicaNames[replica.Head.ActualSize], replicaName) + }() + } + + // Sort the volume head sizes to find the largest. + volumeHeadSizeSorted, err := commonutils.SortKeys(volumeHeadSizeToReplicaNames) + if err != nil { + return nil, errors.Wrap(err, "failed to sort keys of salvage candidate by volume head size") + } + + if len(volumeHeadSizeSorted) == 0 { + return nil, errors.New("failed to find any salvage candidate with volume head size") + } + + // Determine salvage candidates with the largest volume head size. + largestVolumeHeadSize := volumeHeadSizeSorted[len(volumeHeadSizeSorted)-1] + e.log.Infof("Selecting salvage candidates with the largest volume head size %v from %+v", largestVolumeHeadSize, volumeHeadSizeToReplicaNames) + + // Filter out replicas that do not match the largest volume head size. + salvageCandidates := volumeHeadSizeToReplicaNames[largestVolumeHeadSize] + for replicaName := range replicaAddressMap { + if !commonutils.Contains(salvageCandidates, replicaName) { + e.log.Infof("Skipping salvage for replica %s with address %s due to not having the largest volume head size (%v)", replicaName, replicaAddressMap[replicaName]) + delete(filteredCandidates, replicaName) + continue + } + + e.log.Infof("Including replica %s as a salvage candidate", replicaName) + } + + return filteredCandidates, nil +} + func (e *Engine) handleFrontend(spdkClient *spdkclient.Client, portCount int32, superiorPortAllocator *commonbitmap.Bitmap, initiatorCreationRequired, upgradeRequired bool, initiatorAddress, targetAddress string) (err error) { if !types.IsFrontendSupported(e.Frontend) { return fmt.Errorf("unknown frontend type %s", e.Frontend) @@ -442,19 +528,16 @@ func (e *Engine) Delete(spdkClient *spdkclient.Client, superiorPortAllocator *co return err } - for replicaName := range e.ReplicaAddressMap { - if err := disconnectNVMfBdev(spdkClient, e.ReplicaBdevNameMap[replicaName]); err != nil { - if e.ReplicaModeMap[replicaName] != types.ModeERR { - e.ReplicaModeMap[replicaName] = types.ModeERR - e.log.WithError(err).Errorf("Engine failed to disconnect replica %s with bdev %s during deletion, will update the mode to ERR", replicaName, e.ReplicaBdevNameMap[replicaName]) + for replicaName, replicaStatus := range e.ReplicaStatusMap { + if err := disconnectNVMfBdev(spdkClient, replicaStatus.BdevName); err != nil { + if replicaStatus.Mode != types.ModeERR { + e.log.WithError(err).Errorf("Engine failed to disconnect replica %s with bdev %s during deletion, will update the mode from %v to ERR", replicaName, replicaStatus.BdevName, replicaStatus.Mode) + replicaStatus.Mode = types.ModeERR requireUpdate = true } return err } - - delete(e.ReplicaAddressMap, replicaName) - delete(e.ReplicaBdevNameMap, replicaName) - delete(e.ReplicaModeMap, replicaName) + delete(e.ReplicaStatusMap, replicaName) requireUpdate = true } @@ -475,7 +558,7 @@ func (e *Engine) getWithoutLock() (res *spdkrpc.Engine) { Name: e.Name, SpecSize: e.SpecSize, ActualSize: e.ActualSize, - ReplicaAddressMap: e.ReplicaAddressMap, + ReplicaAddressMap: map[string]string{}, ReplicaModeMap: map[string]spdkrpc.ReplicaMode{}, Ip: e.IP, Port: e.Port, @@ -488,8 +571,9 @@ func (e *Engine) getWithoutLock() (res *spdkrpc.Engine) { ErrorMsg: e.ErrorMsg, } - for replicaName, replicaMode := range e.ReplicaModeMap { - res.ReplicaModeMap[replicaName] = types.ReplicaModeToGRPCReplicaMode(replicaMode) + for replicaName, replicaStatus := range e.ReplicaStatusMap { + res.ReplicaAddressMap[replicaName] = replicaStatus.Address + res.ReplicaModeMap[replicaName] = types.ReplicaModeToGRPCReplicaMode(replicaStatus.Mode) } res.Head = api.LvolToProtoLvol(e.Head) for snapshotName, snapApiLvol := range e.SnapshotMap { @@ -571,69 +655,38 @@ func (e *Engine) ValidateAndUpdate(spdkClient *spdkclient.Client) (err error) { return fmt.Errorf("found mismatching between engine spec size %d and actual raid bdev size %d for engine %s", e.SpecSize, bdevRaidSize, e.Name) } - // Verify engine map consistency - for replicaName := range e.ReplicaAddressMap { - if _, exists := e.ReplicaBdevNameMap[replicaName]; !exists { - e.ReplicaModeMap[replicaName] = types.ModeERR - e.ReplicaBdevNameMap[replicaName] = "" - e.log.Errorf("Engine marked replica %s mode from %v to ERR since it is not found in engine %s bdev name map during ValidateAndUpdate", replicaName, e.ReplicaModeMap[replicaName], e.Name) - } - if _, exists := e.ReplicaModeMap[replicaName]; !exists { - e.ReplicaModeMap[replicaName] = types.ModeERR - e.log.Errorf("Engine marked replica %s mode from %v to ERR since it is not found in engine %s mode map during ValidateAndUpdate", replicaName, e.ReplicaModeMap[replicaName], e.Name) - } - } - for replicaName := range e.ReplicaBdevNameMap { - if _, exists := e.ReplicaAddressMap[replicaName]; !exists { - e.ReplicaAddressMap[replicaName] = "" - e.ReplicaModeMap[replicaName] = types.ModeERR - e.log.Errorf("Engine marked replica %s mode from %v to ERR since it is not found in engine %s address map during ValidateAndUpdate", replicaName, e.ReplicaModeMap[replicaName], e.Name) - } - if _, exists := e.ReplicaModeMap[replicaName]; !exists { - e.ReplicaModeMap[replicaName] = types.ModeERR - e.log.Errorf("Engine marked replica %s mode from %v to ERR since it is not found in engine %s mode map during ValidateAndUpdate", replicaName, e.ReplicaModeMap[replicaName], e.Name) - } - } - // Now e.ReplicaAddressMap and e.ReplicaBdevNameMap should have the same key set - for replicaName := range e.ReplicaModeMap { - if _, exists := e.ReplicaAddressMap[replicaName]; !exists { - delete(e.ReplicaModeMap, replicaName) - e.log.Errorf("Engine removed replica %s for the mode map since it is not found in engine %s address map during ValidateAndUpdate", replicaName, e.Name) - } - if _, exists := e.ReplicaBdevNameMap[replicaName]; !exists { - delete(e.ReplicaBdevNameMap, replicaName) - e.log.Errorf("Engine removed replica %s for the mode map since it is not found in engine %s bdev name map during ValidateAndUpdate", replicaName, e.Name) - } - } - e.log = e.log.WithField("replicaAddressMap", e.ReplicaAddressMap) - + // Verify replica status map containValidReplica := false - for replicaName, bdevName := range e.ReplicaBdevNameMap { - if e.ReplicaModeMap[replicaName] == types.ModeERR { - continue - } - if e.ReplicaModeMap[replicaName] != types.ModeWO && e.ReplicaModeMap[replicaName] != types.ModeRW { - e.log.Errorf("Engine found replica %s invalid mode %v during ValidateAndUpdate", replicaName, e.ReplicaModeMap[replicaName]) - e.ReplicaModeMap[replicaName] = types.ModeERR - updateRequired = true - continue + for replicaName, replicaStatus := range e.ReplicaStatusMap { + if replicaStatus.Address == "" || replicaStatus.BdevName == "" { + if replicaStatus.Mode != types.ModeERR { + e.log.Errorf("Engine marked replica %s mode from %v to ERR since its address %s or bdev name %s is empty during ValidateAndUpdate", replicaName, replicaStatus.Mode, replicaStatus.Address, replicaStatus.BdevName) + replicaStatus.Mode = types.ModeERR + updateRequired = true + } } - mode, err := e.validateAndUpdateReplicaNvme(replicaName, bdevMap[bdevName]) - if err != nil { - e.log.WithError(err).Errorf("Engine found valid nvme for replica %v, will update the mode from %s to ERR during ValidateAndUpdate", replicaName, e.ReplicaModeMap[replicaName]) - e.ReplicaModeMap[replicaName] = types.ModeERR + if replicaStatus.Mode != types.ModeRW && replicaStatus.Mode != types.ModeWO && replicaStatus.Mode != types.ModeERR { + e.log.Errorf("Engine found replica %s invalid mode %v during ValidateAndUpdate", replicaName, replicaStatus.Mode) + replicaStatus.Mode = types.ModeERR updateRequired = true - continue } - if e.ReplicaModeMap[replicaName] != mode { - e.log.Infof("Engine updated replica %s mode from %v to %v during ValidateAndUpdate", replicaName, e.ReplicaModeMap[replicaName], mode) - e.ReplicaModeMap[replicaName] = mode - updateRequired = true + if replicaStatus.Mode != types.ModeERR { + mode, err := e.validateAndUpdateReplicaNvme(replicaName, bdevMap[replicaStatus.BdevName]) + if err != nil { + e.log.WithError(err).Errorf("Engine found valid nvme for replica %v, will update the mode from %s to ERR during ValidateAndUpdate", replicaName, replicaStatus.Mode) + replicaStatus.Mode = types.ModeERR + updateRequired = true + } else if replicaStatus.Mode != mode { + replicaStatus.Mode = mode + updateRequired = true + } } - if e.ReplicaModeMap[replicaName] == types.ModeRW { + if replicaStatus.Mode == types.ModeRW { containValidReplica = true } } + e.log = e.log.WithField("replicaStatusMap", e.ReplicaStatusMap) + if !containValidReplica { e.State = types.InstanceStateError e.log.Error("Engine had no RW replica found at the end of ValidateAndUpdate, will be marked as error") @@ -652,45 +705,45 @@ func (e *Engine) checkAndUpdateInfoFromReplicaNoLock() { // hasBackingImage := false hasSnapshot := false - for replicaName, address := range e.ReplicaAddressMap { - if e.ReplicaModeMap[replicaName] != types.ModeRW && e.ReplicaModeMap[replicaName] != types.ModeWO { - if e.ReplicaModeMap[replicaName] != types.ModeERR { - e.log.Warnf("Engine found unexpected mode for replica %s with address %s during info update from replica, mark the mode from %v to ERR and continue info update for other replicas", replicaName, address, e.ReplicaModeMap[replicaName]) - e.ReplicaModeMap[replicaName] = types.ModeERR + for replicaName, replicaStatus := range e.ReplicaStatusMap { + if replicaStatus.Mode != types.ModeRW && replicaStatus.Mode != types.ModeWO { + if replicaStatus.Mode != types.ModeERR { + e.log.Warnf("Engine found unexpected mode for replica %s with address %s during info update from replica, mark the mode from %v to ERR and continue info update for other replicas", replicaName, replicaStatus.Address, replicaStatus.Mode) + replicaStatus.Mode = types.ModeERR } continue } // Ensure the replica is not rebuilding func() { - replicaServiceCli, err := GetServiceClient(address) + replicaServiceCli, err := GetServiceClient(replicaStatus.Address) if err != nil { - e.log.WithError(err).Errorf("Failed to get service client for replica %s with address %s, will skip this replica and continue info update for other replicas", replicaName, address) + e.log.WithError(err).Errorf("Engine failed to get service client for replica %s with address %s, will skip this replica and continue info update for other replicas", replicaName, replicaStatus.Address) return } defer func() { if errClose := replicaServiceCli.Close(); errClose != nil { - e.log.WithError(errClose).Errorf("Failed to close replica %s client with address %s during check and update info from replica", replicaName, address) + e.log.WithError(errClose).Errorf("Engine failed to close replica %s client with address %s during check and update info from replica", replicaName, replicaStatus.Address) } }() replica, err := replicaServiceCli.ReplicaGet(replicaName) if err != nil { - e.log.WithError(err).Warnf("Failed to get replica %s with address %s, mark the mode from %v to ERR", replicaName, address, e.ReplicaModeMap[replicaName]) - e.ReplicaModeMap[replicaName] = types.ModeERR + e.log.WithError(err).Warnf("Engine failed to get replica %s with address %s, mark the mode from %v to ERR", replicaName, replicaStatus.Address, replicaStatus.Mode) + replicaStatus.Mode = types.ModeERR return } - if e.ReplicaModeMap[replicaName] == types.ModeWO { + if replicaStatus.Mode == types.ModeWO { shallowCopyStatus, err := replicaServiceCli.ReplicaRebuildingDstShallowCopyCheck(replicaName) if err != nil { - e.log.WithError(err).Warnf("Failed to get rebuilding replica %s shallow copy info, will skip this replica and continue info update for other replicas", replicaName) + e.log.WithError(err).Warnf("Engine failed to get rebuilding replica %s shallow copy info, will skip this replica and continue info update for other replicas", replicaName) return } if shallowCopyStatus.TotalState == helpertypes.ShallowCopyStateError || shallowCopyStatus.Error != "" { e.log.Errorf("Engine found rebuilding replica %s error %v during info update from replica, will mark the mode from WO to ERR and continue info update for other replicas", replicaName, shallowCopyStatus.Error) - e.ReplicaModeMap[replicaName] = types.ModeERR + replicaStatus.Mode = types.ModeERR } // No need to do anything if `shallowCopyStatus.TotalState == helpertypes.ShallowCopyStateComplete`, engine should leave the rebuilding logic to update its mode return @@ -716,13 +769,13 @@ func (e *Engine) checkAndUpdateInfoFromReplicaNoLock() { } } else { if hasSnapshot { - e.log.Warnf("Found replica %s does not have a snapshot while other replicas have during info update for other replicas", replicaName) + e.log.Warnf("Engine found replica %s does not have a snapshot while other replicas have during info update for other replicas", replicaName) } else { replicaAncestorMap[replicaName] = replica.Head } } if replicaAncestorMap[replicaName] == nil { - e.log.Warnf("Cannot find replica %s ancestor, will skip this replica and continue info update for other replicas", replicaName) + e.log.Warnf("Engine cannot find replica %s ancestor, will skip this replica and continue info update for other replicas", replicaName) return } replicaMap[replicaName] = replica @@ -877,15 +930,15 @@ func (e *Engine) validateAndUpdateReplicaNvme(replicaName string, bdev *spdktype return types.ModeERR, fmt.Errorf("found invalid address family %s and transport type %s in a remote nvme base bdev %s during replica %s mode validation", nvmeInfo.Trid.Adrfam, nvmeInfo.Trid.Trtype, bdev.Name, replicaName) } bdevAddr := net.JoinHostPort(nvmeInfo.Trid.Traddr, nvmeInfo.Trid.Trsvcid) - if e.ReplicaAddressMap[replicaName] != bdevAddr { - return types.ModeERR, fmt.Errorf("found mismatching between replica bdev %s address %s and the nvme bdev actual address %s during replica %s mode validation", bdev.Name, e.ReplicaAddressMap[replicaName], bdevAddr, replicaName) + if e.ReplicaStatusMap[replicaName].Address != bdevAddr { + return types.ModeERR, fmt.Errorf("found mismatching between replica bdev %s address %s and the nvme bdev actual address %s during replica %s mode validation", bdev.Name, e.ReplicaStatusMap[replicaName].Address, bdevAddr, replicaName) } - controllerName := helperutil.GetNvmeControllerNameFromNamespaceName(e.ReplicaBdevNameMap[replicaName]) + controllerName := helperutil.GetNvmeControllerNameFromNamespaceName(e.ReplicaStatusMap[replicaName].BdevName) if controllerName != replicaName { return types.ModeERR, fmt.Errorf("found unexpected the nvme bdev controller name %s (bdev name %s) during replica %s mode validation", controllerName, bdev.Name, replicaName) } - return e.ReplicaModeMap[replicaName], nil + return e.ReplicaStatusMap[replicaName].Mode, nil } func (e *Engine) ReplicaAdd(spdkClient *spdkclient.Client, dstReplicaName, dstReplicaAddress string) (err error) { @@ -907,12 +960,12 @@ func (e *Engine) ReplicaAdd(spdkClient *spdkclient.Client, dstReplicaName, dstRe return fmt.Errorf("invalid state %v for engine %s replica %s add start", e.State, e.Name, dstReplicaName) } - if _, exists := e.ReplicaAddressMap[dstReplicaName]; exists { + if _, exists := e.ReplicaStatusMap[dstReplicaName]; exists { return fmt.Errorf("replica %s already exists", dstReplicaName) } - for replicaName, replicaMode := range e.ReplicaModeMap { - if replicaMode == types.ModeWO { + for replicaName, replicaStatus := range e.ReplicaStatusMap { + if replicaStatus.Mode == types.ModeWO { return fmt.Errorf("cannot add a new replica %s since there is already a rebuilding replica %s", dstReplicaName, replicaName) } } @@ -934,9 +987,8 @@ func (e *Engine) ReplicaAdd(spdkClient *spdkclient.Client, dstReplicaName, dstRe } } if engineErr != nil || err != nil { - e.log.WithError(err).Errorf("Engine failed to start replica %s rebuilding, will mark the rebuilding replica mode from %v to ERR", dstReplicaName, e.ReplicaModeMap[dstReplicaName]) - e.ReplicaModeMap[dstReplicaName] = types.ModeERR - e.ReplicaBdevNameMap[dstReplicaName] = "" + e.log.WithError(err).Errorf("Engine failed to start replica %s rebuilding, will mark the rebuilding replica mode from %v to ERR", dstReplicaName, e.ReplicaStatusMap[dstReplicaName].Mode) + e.ReplicaStatusMap[dstReplicaName].Mode = types.ModeERR updateRequired = true } }() @@ -1034,14 +1086,17 @@ func (e *Engine) ReplicaAdd(spdkClient *spdkclient.Client, dstReplicaName, dstRe return errors.Wrapf(err, "failed to adding the rebuilding replica %s head bdev %s to the base bdev list for engine %s", dstReplicaName, dstHeadLvolBdevName, e.Name) } - e.ReplicaAddressMap[dstReplicaName] = dstReplicaAddress - e.ReplicaBdevNameMap[dstReplicaName] = dstHeadLvolBdevName + e.ReplicaStatusMap[dstReplicaName] = &EngineReplicaStatus{ + Address: dstReplicaAddress, + Mode: types.ModeWO, + BdevName: dstHeadLvolBdevName, + } + updateRequired = true // TODO: Mark the destination replica as WO mode here does not prevent the RAID bdev from using this. May need to have a SPDK API to control the corresponding base bdev mode. // Reading data from this dst replica is not a good choice as the flow will be more zigzag than reading directly from the src replica: // application -> RAID1 -> this base bdev (dest replica) -> the exposed snapshot (src replica). - e.ReplicaModeMap[dstReplicaName] = types.ModeWO - e.log = e.log.WithField("replicaAddressMap", e.ReplicaAddressMap) + e.log = e.log.WithField("replicaStatusMap", e.ReplicaStatusMap) e.log.Infof("Engine started to rebuild replica %s from healthy replica %s", dstReplicaName, srcReplicaName) @@ -1086,9 +1141,9 @@ func (e *Engine) replicaShallowCopy(srcReplicaServiceCli, dstReplicaServiceCli * // Blindly mark the rebuilding replica as mode ERR now. if err != nil { e.Lock() - if e.ReplicaModeMap[dstReplicaName] != types.ModeERR { - e.ReplicaModeMap[dstReplicaName] = types.ModeERR - e.log.WithError(err).Errorf("Engine failed to do shallow copy from src replica %s to dst replica %s, will mark the rebuilding replica mode from %v to ERR", srcReplicaName, dstReplicaName, e.ReplicaModeMap[dstReplicaName]) + if e.ReplicaStatusMap[dstReplicaName] != nil && e.ReplicaStatusMap[dstReplicaName].Mode != types.ModeERR { + e.log.WithError(err).Errorf("Engine failed to do shallow copy from src replica %s to dst replica %s, will mark the rebuilding replica mode from %v to ERR", srcReplicaName, dstReplicaName, e.ReplicaStatusMap[dstReplicaName].Mode) + e.ReplicaStatusMap[dstReplicaName].Mode = types.ModeERR updateRequired = true } e.Unlock() @@ -1185,6 +1240,11 @@ func (e *Engine) replicaAddFinish(srcReplicaServiceCli, dstReplicaServiceCli *cl } }() + dstReplicaStatus := e.ReplicaStatusMap[dstReplicaName] + if dstReplicaStatus == nil { + return fmt.Errorf("cannot find the dst replica %s in the engine %s replica status map during replica add finish", dstReplicaName, e.Name) + } + // Blindly ask the source replica to detach the rebuilding lvol // If this detachment fails, there may be leftover rebuilding nvme controller in spdk_tgt of the src replica. We should continue since it's not a fatal error and shall not block the flow // Similarly, the below src/dst replica finish should not block the flow either. @@ -1207,13 +1267,13 @@ func (e *Engine) replicaAddFinish(srcReplicaServiceCli, dstReplicaServiceCli *cl // The destination replica will change the parent of the head to the newly rebuilt snapshot chain and detach the external snapshot. // Besides, it should clean up the attached rebuilding lvol if exists. - if e.ReplicaModeMap[dstReplicaName] == types.ModeWO { + if dstReplicaStatus.Mode == types.ModeWO { if dstReplicaErr := dstReplicaServiceCli.ReplicaRebuildingDstFinish(dstReplicaName); dstReplicaErr != nil { - e.log.WithError(dstReplicaErr).Errorf("Engine failed to finish rebuilding dst replica %s, will update the mode from %v to ERR then continue rebuilding src replica %s finish", dstReplicaName, e.ReplicaModeMap[dstReplicaName], srcReplicaName) - e.ReplicaModeMap[dstReplicaName] = types.ModeERR + e.log.WithError(dstReplicaErr).Errorf("Engine failed to finish rebuilding dst replica %s, will update the mode from %v to ERR then continue rebuilding src replica %s finish", dstReplicaName, dstReplicaStatus.Mode, srcReplicaName) + dstReplicaStatus.Mode = types.ModeERR } else { - e.log.Infof("Engine succeeded to finish rebuilding dst replica %s, will update the mode from %v to RW", dstReplicaName, e.ReplicaModeMap[dstReplicaName]) - e.ReplicaModeMap[dstReplicaName] = types.ModeRW + e.log.Infof("Engine succeeded to finish rebuilding dst replica %s, will update the mode from %v to RW", dstReplicaName, dstReplicaStatus.Mode) + dstReplicaStatus.Mode = types.ModeRW } updateRequired = true } @@ -1231,12 +1291,12 @@ func (e *Engine) replicaAddFinish(srcReplicaServiceCli, dstReplicaServiceCli *cl } func (e *Engine) getReplicaAddSrcReplica() (srcReplicaName, srcReplicaAddress string, err error) { - for replicaName, replicaMode := range e.ReplicaModeMap { - if replicaMode != types.ModeRW { + for replicaName, replicaStatus := range e.ReplicaStatusMap { + if replicaStatus.Mode != types.ModeRW { continue } srcReplicaName = replicaName - srcReplicaAddress = e.ReplicaAddressMap[replicaName] + srcReplicaAddress = replicaStatus.Address break } if srcReplicaName == "" || srcReplicaAddress == "" { @@ -1285,8 +1345,8 @@ func (e *Engine) ReplicaDelete(spdkClient *spdkclient.Client, replicaName, repli defer e.Unlock() if replicaName == "" { - for rName, rAddr := range e.ReplicaAddressMap { - if rAddr == replicaAddress { + for rName, rStatus := range e.ReplicaStatusMap { + if rStatus.Address == replicaAddress { replicaName = rName break } @@ -1295,20 +1355,21 @@ func (e *Engine) ReplicaDelete(spdkClient *spdkclient.Client, replicaName, repli if replicaName == "" { return fmt.Errorf("cannot find replica name with address %s for engine %s replica delete", replicaAddress, e.Name) } - if e.ReplicaAddressMap[replicaName] == "" { - return fmt.Errorf("cannot find replica %s for engine %s replica delete", replicaName, e.Name) + replicaStatus := e.ReplicaStatusMap[replicaName] + if replicaStatus == nil { + return fmt.Errorf("cannot find replica %s from the replica status map for engine %s replica delete", replicaName, e.Name) } - if replicaAddress != "" && e.ReplicaAddressMap[replicaName] != replicaAddress { - return fmt.Errorf("replica %s recorded address %s does not match the input address %s for engine %s replica delete", replicaName, e.ReplicaAddressMap[replicaName], replicaAddress, e.Name) + if replicaAddress != "" && replicaStatus.Address != replicaAddress { + return fmt.Errorf("replica %s recorded address %s does not match the input address %s for engine %s replica delete", replicaName, replicaStatus.Address, replicaAddress, e.Name) } - e.log.Infof("Removing base bdev %v from engine", e.ReplicaBdevNameMap[replicaName]) - if _, err := spdkClient.BdevRaidRemoveBaseBdev(e.ReplicaBdevNameMap[replicaName]); err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { - return errors.Wrapf(err, "failed to remove base bdev %s for deleting replica %s", e.ReplicaBdevNameMap[replicaName], replicaName) + e.log.Infof("Removing base bdev %v from engine", replicaStatus.BdevName) + if _, err := spdkClient.BdevRaidRemoveBaseBdev(replicaStatus.BdevName); err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { + return errors.Wrapf(err, "failed to remove base bdev %s for deleting replica %s", replicaStatus.BdevName, replicaName) } - controllerName := helperutil.GetNvmeControllerNameFromNamespaceName(e.ReplicaBdevNameMap[replicaName]) - // Fallback to use replica name. Make sure there won't be a leftover controller even if somehow `e.ReplicaBdevNameMap[replicaName]` has no record + controllerName := helperutil.GetNvmeControllerNameFromNamespaceName(replicaStatus.BdevName) + // Fallback to use replica name. Make sure there won't be a leftover controller even if somehow `replicaStatus.BdevName` has no record if controllerName == "" { e.log.Infof("No NVMf controller found for replica %s, so fallback to use replica name %s", replicaName, replicaName) controllerName = replicaName @@ -1319,10 +1380,8 @@ func (e *Engine) ReplicaDelete(spdkClient *spdkclient.Client, replicaName, repli return errors.Wrapf(err, "failed to detach controller %s for deleting replica %s", controllerName, replicaName) } - delete(e.ReplicaAddressMap, replicaName) - delete(e.ReplicaModeMap, replicaName) - delete(e.ReplicaBdevNameMap, replicaName) - e.log = e.log.WithField("replicaAddressMap", e.ReplicaAddressMap) + delete(e.ReplicaStatusMap, replicaName) + e.log = e.log.WithField("replicaStatusMap", e.ReplicaStatusMap) return nil } @@ -1444,11 +1503,14 @@ func (e *Engine) snapshotOperation(spdkClient *spdkclient.Client, inputSnapshotN func (e *Engine) getReplicaClients() (replicaClients map[string]*client.SPDKClient, err error) { replicaClients = map[string]*client.SPDKClient{} - for replicaName := range e.ReplicaAddressMap { - if e.ReplicaModeMap[replicaName] == types.ModeERR { + for replicaName, replicaStatus := range e.ReplicaStatusMap { + if replicaStatus.Mode != types.ModeRW && replicaStatus.Mode != types.ModeWO { + continue + } + if replicaStatus.Address == "" { continue } - c, err := GetServiceClient(e.ReplicaAddressMap[replicaName]) + c, err := GetServiceClient(replicaStatus.Address) if err != nil { return nil, err } @@ -1470,6 +1532,10 @@ func (e *Engine) closeRplicaClients(replicaClients map[string]*client.SPDKClient func (e *Engine) snapshotOperationPreCheckWithoutLock(replicaClients map[string]*client.SPDKClient, snapshotName string, snapshotOp SnapshotOperationType) (string, error) { for replicaName := range replicaClients { + replicaStatus := e.ReplicaStatusMap[replicaName] + if replicaStatus == nil { + return "", fmt.Errorf("cannot find replica %s in the engine %s replica status map before snapshot %s operation", replicaName, e.Name, snapshotName) + } switch snapshotOp { case SnapshotOperationCreate: if snapshotName == "" { @@ -1479,7 +1545,7 @@ func (e *Engine) snapshotOperationPreCheckWithoutLock(replicaClients map[string] if snapshotName == "" { return "", fmt.Errorf("empty snapshot name for engine %s snapshot deletion", e.Name) } - if e.ReplicaModeMap[replicaName] == types.ModeWO { + if replicaStatus.Mode == types.ModeWO { return "", fmt.Errorf("engine %s contains WO replica %s during snapshot %s delete", e.Name, replicaName, snapshotName) } e.checkAndUpdateInfoFromReplicaNoLock() @@ -1499,7 +1565,7 @@ func (e *Engine) snapshotOperationPreCheckWithoutLock(replicaClients map[string] if e.Frontend != types.FrontendEmpty { return "", fmt.Errorf("invalid frontend %v for engine %s snapshot %s revert", e.Frontend, e.Name, snapshotName) } - if e.ReplicaModeMap[replicaName] == types.ModeWO { + if replicaStatus.Mode == types.ModeWO { return "", fmt.Errorf("engine %s contains WO replica %s during snapshot %s revert", e.Name, replicaName, snapshotName) } r, err := replicaClients[replicaName].ReplicaGet(replicaName) @@ -1510,7 +1576,7 @@ func (e *Engine) snapshotOperationPreCheckWithoutLock(replicaClients map[string] return "", fmt.Errorf("replica %s does not contain the reverting snapshot %s", replicaName, snapshotName) } case SnapshotOperationPurge: - if e.ReplicaModeMap[replicaName] == types.ModeWO { + if replicaStatus.Mode == types.ModeWO { return "", fmt.Errorf("engine %s contains WO replica %s during snapshot purge", e.Name, replicaName) } // TODO: Do we need to verify that all replicas hold the same system snapshot list? @@ -1531,23 +1597,27 @@ func (e *Engine) snapshotOperationWithoutLock(spdkClient *spdkclient.Client, rep } for replicaName := range replicaClients { - if err := e.replicaSnapshotOperation(spdkClient, replicaClients[replicaName], replicaName, snapshotName, snapshotOp, opts); err != nil && e.ReplicaModeMap[replicaName] != types.ModeERR { - e.log.WithError(err).Errorf("Engine failed to issue operation %s for replica %s snapshot %s, will mark the replica mode from %v to ERR", snapshotOp, replicaName, snapshotName, e.ReplicaModeMap[replicaName]) - e.ReplicaModeMap[replicaName] = types.ModeERR + replicaStatus := e.ReplicaStatusMap[replicaName] + if replicaStatus == nil { + return false, fmt.Errorf("cannot find replica %s in the engine %s replica status map during snapshot %s operation", replicaName, e.Name, snapshotName) + } + if err := e.replicaSnapshotOperation(spdkClient, replicaClients[replicaName], replicaName, snapshotName, snapshotOp, opts); err != nil && replicaStatus.Mode != types.ModeERR { + e.log.WithError(err).Errorf("Engine failed to issue operation %s for replica %s snapshot %s, will mark the replica mode from %v to ERR", snapshotOp, replicaName, snapshotName, replicaStatus.Mode) + replicaStatus.Mode = types.ModeERR updated = true } } if snapshotOp == SnapshotOperationRevert { replicaBdevList := []string{} - for replicaName, bdevName := range e.ReplicaBdevNameMap { - if e.ReplicaModeMap[replicaName] != types.ModeRW { + for _, replicaStatus := range e.ReplicaStatusMap { + if replicaStatus.Mode != types.ModeRW { continue } - if e.ReplicaBdevNameMap[replicaName] == "" { + if replicaStatus.BdevName == "" { continue } - replicaBdevList = append(replicaBdevList, bdevName) + replicaBdevList = append(replicaBdevList, replicaStatus.BdevName) } if _, err := spdkClient.BdevRaidCreate(e.Name, spdktypes.BdevRaidLevel1, 0, replicaBdevList); err != nil { e.log.WithError(err).Errorf("Failed to re-create RAID after snapshot %s revert", snapshotName) @@ -1566,20 +1636,24 @@ func (e *Engine) replicaSnapshotOperation(spdkClient *spdkclient.Client, replica case SnapshotOperationDelete: return replicaClient.ReplicaSnapshotDelete(replicaName, snapshotName) case SnapshotOperationRevert: - if err := disconnectNVMfBdev(spdkClient, e.ReplicaBdevNameMap[replicaName]); err != nil { + replicaStatus := e.ReplicaStatusMap[replicaName] + if replicaStatus == nil { + return fmt.Errorf("cannot find replica %s in the engine %s replica status map during snapshot %s operation", replicaName, e.Name, snapshotName) + } + if err := disconnectNVMfBdev(spdkClient, replicaStatus.BdevName); err != nil { return err } - delete(e.ReplicaBdevNameMap, replicaName) + replicaStatus.BdevName = "" // If the below step failed, the replica will be marked as ERR during ValidateAndUpdate. if err := replicaClient.ReplicaSnapshotRevert(replicaName, snapshotName); err != nil { return err } - bdevName, err := connectNVMfBdev(spdkClient, replicaName, e.ReplicaAddressMap[replicaName]) + bdevName, err := connectNVMfBdev(spdkClient, replicaName, replicaStatus.Address) if err != nil { return err } if bdevName != "" { - e.ReplicaBdevNameMap[replicaName] = bdevName + replicaStatus.BdevName = bdevName } case SnapshotOperationPurge: return replicaClient.ReplicaSnapshotPurge(replicaName) @@ -1596,23 +1670,23 @@ func (e *Engine) ReplicaList(spdkClient *spdkclient.Client) (ret map[string]*api replicas := map[string]*api.Replica{} - for name, address := range e.ReplicaAddressMap { - replicaServiceCli, err := GetServiceClient(address) + for name, replicaStatus := range e.ReplicaStatusMap { + replicaServiceCli, err := GetServiceClient(replicaStatus.Address) if err != nil { - e.log.WithError(err).Errorf("Failed to get service client for replica %s with address %s during list replicas", name, address) + e.log.WithError(err).Errorf("Failed to get service client for replica %s with address %s during list replicas", name, replicaStatus.Address) continue } func() { defer func() { if errClose := replicaServiceCli.Close(); errClose != nil { - e.log.WithError(errClose).Errorf("Failed to close replica %s client with address %s during list replicas", name, address) + e.log.WithError(errClose).Errorf("Failed to close replica %s client with address %s during list replicas", name, replicaStatus.Address) } }() replica, err := replicaServiceCli.ReplicaGet(name) if err != nil { - e.log.WithError(err).Errorf("Failed to get replica %s with address %s", name, address) + e.log.WithError(err).Errorf("Failed to get replica %s with address %s", name, replicaStatus.Address) return } @@ -1649,12 +1723,12 @@ func (e *Engine) BackupCreate(backupName, volumeName, engineName, snapshotName, defer e.Unlock() replicaName, replicaAddress := "", "" - for name, mode := range e.ReplicaModeMap { - if mode != types.ModeRW { + for name, replicaStatus := range e.ReplicaStatusMap { + if replicaStatus.Mode != types.ModeRW { continue } replicaName = name - replicaAddress = e.ReplicaAddressMap[name] + replicaAddress = replicaStatus.Address break } @@ -1700,9 +1774,9 @@ func (e *Engine) BackupStatus(backupName, replicaAddress string) (*spdkrpc.Backu defer e.Unlock() found := false - for name, mode := range e.ReplicaModeMap { - if e.ReplicaAddressMap[name] == replicaAddress { - if mode != types.ModeRW { + for name, replicaStatus := range e.ReplicaStatusMap { + if replicaStatus.Address == replicaAddress { + if replicaStatus.Mode != types.ModeRW { return nil, grpcstatus.Errorf(grpccodes.Internal, "replica %s is not in RW mode", name) } found = true @@ -1739,11 +1813,12 @@ func (e *Engine) BackupRestore(spdkClient *spdkclient.Client, backupUrl, engineN } e.log.Info("Disconnecting all replicas before restoration") - for replicaName := range e.ReplicaAddressMap { - if err := disconnectNVMfBdev(spdkClient, e.ReplicaBdevNameMap[replicaName]); err != nil { + for replicaName, replicaStatus := range e.ReplicaStatusMap { + if err := disconnectNVMfBdev(spdkClient, replicaStatus.BdevName); err != nil { e.log.Infof("Failed to remove replica %s before restoration", replicaName) return nil, errors.Wrapf(err, "failed to remove replica %s before restoration", replicaName) } + replicaStatus.BdevName = "" } e.IsRestoring = true @@ -1773,20 +1848,20 @@ func (e *Engine) BackupRestore(spdkClient *spdkclient.Client, backupUrl, engineN resp := &spdkrpc.EngineBackupRestoreResponse{ Errors: map[string]string{}, } - for replicaName, replicaAddress := range e.ReplicaAddressMap { - e.log.Infof("Restoring backup on replica %s address %s", replicaName, replicaAddress) + for replicaName, replicaStatus := range e.ReplicaStatusMap { + e.log.Infof("Restoring backup on replica %s address %s", replicaName, replicaStatus.Address) - replicaServiceCli, err := GetServiceClient(replicaAddress) + replicaServiceCli, err := GetServiceClient(replicaStatus.Address) if err != nil { - e.log.WithError(err).Errorf("Failed to restore backup on replica %s with address %s", replicaName, replicaAddress) - resp.Errors[replicaAddress] = err.Error() + e.log.WithError(err).Errorf("Failed to restore backup on replica %s with address %s", replicaName, replicaStatus.Address) + resp.Errors[replicaStatus.Address] = err.Error() continue } func() { defer func() { if errClose := replicaServiceCli.Close(); errClose != nil { - e.log.WithError(errClose).Errorf("Failed to close replica %s client with address %s during restore backup", replicaName, replicaAddress) + e.log.WithError(errClose).Errorf("Failed to close replica %s client with address %s during restore backup", replicaName, replicaStatus.Address) } }() @@ -1798,8 +1873,8 @@ func (e *Engine) BackupRestore(spdkClient *spdkclient.Client, backupUrl, engineN ConcurrentLimit: concurrentLimit, }) if err != nil { - e.log.WithError(err).Errorf("Failed to restore backup on replica %s address %s", replicaName, replicaAddress) - resp.Errors[replicaAddress] = err.Error() + e.log.WithError(err).Errorf("Failed to restore backup on replica %s address %s", replicaName, replicaStatus.Address) + resp.Errors[replicaStatus.Address] = err.Error() } }() } @@ -1822,12 +1897,12 @@ func (e *Engine) waitForRestoreComplete() error { var err error for range periodicChecker.C { isReplicaRestoreCompleted := true - for replicaName, replicaAddress := range e.ReplicaAddressMap { - if e.ReplicaModeMap[replicaName] != types.ModeRW { + for replicaName, replicaStatus := range e.ReplicaStatusMap { + if replicaStatus.Mode != types.ModeRW { continue } - isReplicaRestoreCompleted, err = e.isReplicaRestoreCompleted(replicaName, replicaAddress) + isReplicaRestoreCompleted, err = e.isReplicaRestoreCompleted(replicaName, replicaStatus.Address) if err != nil { return errors.Wrapf(err, "failed to check replica %s restore status", replicaName) } @@ -1872,8 +1947,8 @@ func (e *Engine) BackupRestoreFinish(spdkClient *spdkclient.Client) error { defer e.Unlock() replicaBdevList := []string{} - for replicaName, bdevName := range e.ReplicaBdevNameMap { - replicaAddress := e.ReplicaAddressMap[replicaName] + for replicaName, replicaStatus := range e.ReplicaStatusMap { + replicaAddress := replicaStatus.Address replicaIP, replicaPort, err := net.SplitHostPort(replicaAddress) if err != nil { return err @@ -1884,7 +1959,7 @@ func (e *Engine) BackupRestoreFinish(spdkClient *spdkclient.Client) error { if err != nil { return err } - replicaBdevList = append(replicaBdevList, bdevName) + replicaBdevList = append(replicaBdevList, replicaStatus.BdevName) } e.log.Infof("Creating raid bdev %s with replicas %+v before finishing restoration", e.Name, replicaBdevList) @@ -1908,16 +1983,16 @@ func (e *Engine) RestoreStatus() (*spdkrpc.RestoreStatusResponse, error) { e.Lock() defer e.Unlock() - for replicaName, replicaAddress := range e.ReplicaAddressMap { - if e.ReplicaModeMap[replicaName] != types.ModeRW { + for replicaName, replicaStatus := range e.ReplicaStatusMap { + if replicaStatus.Mode != types.ModeRW { continue } - status, err := e.getReplicaRestoreStatus(replicaName, replicaAddress) + restoreStatus, err := e.getReplicaRestoreStatus(replicaName, replicaStatus.Address) if err != nil { return nil, err } - resp.Status[replicaAddress] = status + resp.Status[replicaStatus.Address] = restoreStatus } return resp, nil @@ -2244,14 +2319,13 @@ func (e *Engine) DeleteTarget(spdkClient *spdkclient.Client, superiorPortAllocat return errors.Wrapf(err, "failed to delete raid bdev after engine %s target switchover", e.Name) } - for replicaName := range e.ReplicaAddressMap { + for replicaName, replicaStatus := range e.ReplicaStatusMap { e.log.Infof("Disconnecting replica %s after target switchover", replicaName) - if err := disconnectNVMfBdev(spdkClient, e.ReplicaBdevNameMap[replicaName]); err != nil { - e.log.WithError(err).Warnf("Engine failed to disconnect replica %s after target switchover, will mark the replica mode from %v to ERR", replicaName, e.ReplicaModeMap[replicaName]) - if e.ReplicaModeMap[replicaName] != types.ModeERR { - e.ReplicaModeMap[replicaName] = types.ModeERR - } + if err := disconnectNVMfBdev(spdkClient, replicaStatus.BdevName); err != nil { + e.log.WithError(err).Warnf("Engine failed to disconnect replica %s after target switchover, will mark the replica mode from %v to ERR", replicaName, replicaStatus.Mode) + replicaStatus.Mode = types.ModeERR } + replicaStatus.BdevName = "" } return nil } diff --git a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/replica.go b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/replica.go index e76e937ac..f86bf995a 100644 --- a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/replica.go +++ b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/replica.go @@ -612,6 +612,11 @@ func (r *Replica) Create(spdkClient *spdkclient.Client, portCount int32, superio } headSvcLvol := r.ActiveChain[r.ChainLength-1] + if headSvcLvol.UUID == "" && r.State == types.InstanceStateStopped { + r.log.Debugf("Updating replica %s state from %v to %v because headSvcLvol.UUID is empty", r.Name, r.State, types.InstanceStatePending) + r.State = types.InstanceStatePending + } + // Create bdev lvol if the replica is the new one if r.State == types.InstanceStatePending { var lvsList []spdktypes.LvstoreInfo @@ -636,17 +641,46 @@ func (r *Replica) Create(spdkClient *spdkclient.Client, portCount int32, superio return nil, fmt.Errorf("found mismatching between the actual lvstore name %s with UUID %s and the recorded lvstore name %s with UUID %s during replica %s creation", lvsList[0].Name, lvsList[0].UUID, r.LvsName, r.LvsUUID, r.Name) } - r.log.Info("Creating a lvol bdev for the new replica") - if _, err := spdkClient.BdevLvolCreate("", r.LvsUUID, r.Name, util.BytesToMiB(r.SpecSize), "", true); err != nil { - return nil, err - } bdevLvolList, err := spdkClient.BdevLvolGet(r.Alias, 0) - if err != nil { - return nil, err + if err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { + return nil, errors.Wrapf(err, "failed to check existence of lvol bdev for the new replica %v", r.Name) + } + + if len(bdevLvolList) == 0 { + r.log.Info("Creating a lvol bdev for the new replica") + if _, err := spdkClient.BdevLvolCreate("", r.LvsUUID, r.Name, util.BytesToMiB(r.SpecSize), "", true); err != nil { + return nil, err + } + bdevLvolList, err = spdkClient.BdevLvolGet(r.Alias, 0) + if err != nil { + return nil, err + } + } else { + r.log.Infof("Skipping creating a lvol bdev %v during replica creation because it already exists", r.Alias) + + replicaLvolFilter := func(bdev *spdktypes.BdevInfo) bool { + var lvolName string + if len(bdev.Aliases) == 1 { + lvolName = spdktypes.GetLvolNameFromAlias(bdev.Aliases[0]) + } + return IsReplicaLvol(r.Name, lvolName) || (r.ActiveChain[0] != nil && r.ActiveChain[0].Name == lvolName) + } + bdevLvolMap, err := GetBdevLvolMapWithFilter(spdkClient, replicaLvolFilter) + if err != nil { + return nil, err + } + + r.log.Infof("Constructing replica %v object during replica creation", r.Name) + err = r.construct(bdevLvolMap) + if err != nil { + return nil, err + } } + if len(bdevLvolList) < 1 { return nil, fmt.Errorf("cannot find lvol %v after creation", r.Alias) } + headSvcLvol.UUID = bdevLvolList[0].UUID headSvcLvol.CreationTime = bdevLvolList[0].CreationTime headSvcLvol.ActualSize = bdevLvolList[0].DriverSpecific.Lvol.NumAllocatedClusters * defaultClusterSize @@ -670,9 +704,17 @@ func (r *Replica) Create(spdkClient *spdkclient.Client, portCount int32, superio } r.portAllocator = bitmap + nqn := helpertypes.GetNQN(r.Name) + + // Blindly stop exposing the bdev if it exists. This is to avoid potential inconsistencies during salvage case. + if err := spdkClient.StopExposeBdev(nqn); err != nil && !jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err) { + return nil, errors.Wrapf(err, "failed to stop expose replica %v", r.Name) + } + nguid := commonutils.RandomID(nvmeNguidLength) - if err := spdkClient.StartExposeBdev(helpertypes.GetNQN(r.Name), headSvcLvol.UUID, nguid, podIP, strconv.Itoa(int(r.PortStart))); err != nil { - return nil, err + + if err := spdkClient.StartExposeBdev(nqn, headSvcLvol.UUID, nguid, podIP, strconv.Itoa(int(r.PortStart))); err != nil { + return nil, errors.Wrapf(err, "failed to expose replica %v", r.Name) } r.IsExposed = true r.State = types.InstanceStateRunning diff --git a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/server.go b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/server.go index 7d0b26582..2e40fec8f 100644 --- a/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/server.go +++ b/vendor/github.com/longhorn/longhorn-spdk-engine/pkg/spdk/server.go @@ -865,7 +865,7 @@ func (s *Server) EngineCreate(ctx context.Context, req *spdkrpc.EngineCreateRequ spdkClient := s.spdkClient s.Unlock() - return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator, req.InitiatorAddress, req.TargetAddress, req.UpgradeRequired) + return e.Create(spdkClient, req.ReplicaAddressMap, req.PortCount, s.portAllocator, req.InitiatorAddress, req.TargetAddress, req.UpgradeRequired, req.SalvageRequested) } func localTargetExists(e *Engine) bool { diff --git a/vendor/modules.txt b/vendor/modules.txt index c1563ea05..5ca72d9ee 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -267,7 +267,7 @@ github.com/longhorn/longhorn-engine/pkg/sync github.com/longhorn/longhorn-engine/pkg/types github.com/longhorn/longhorn-engine/pkg/util github.com/longhorn/longhorn-engine/pkg/util/disk -# github.com/longhorn/longhorn-spdk-engine v0.0.0-20241023025831-ecc7b8a48d56 +# github.com/longhorn/longhorn-spdk-engine v0.0.0-20241106035350-4642db182def ## explicit; go 1.22.0 github.com/longhorn/longhorn-spdk-engine/pkg/api github.com/longhorn/longhorn-spdk-engine/pkg/client From f324a73f4f6f8a6d9ea7c4216d9928d3f016fa65 Mon Sep 17 00:00:00 2001 From: Chin-Ya Huang Date: Fri, 20 Sep 2024 15:34:52 +0800 Subject: [PATCH 2/2] feat(v2/salvage): identify engine instance created to salvage longhorn/longhorn-8430 Signed-off-by: Chin-Ya Huang --- pkg/client/instance.go | 2 ++ pkg/instance/instance.go | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/client/instance.go b/pkg/client/instance.go index 6b4f67083..c2f862385 100644 --- a/pkg/client/instance.go +++ b/pkg/client/instance.go @@ -91,6 +91,7 @@ type EngineCreateRequest struct { InitiatorAddress string TargetAddress string UpgradeRequired bool + SalvageRequested bool } type ReplicaCreateRequest struct { @@ -147,6 +148,7 @@ func (c *InstanceServiceClient) InstanceCreate(req *InstanceCreateRequest) (*api Size: req.Size, ReplicaAddressMap: req.Engine.ReplicaAddressMap, Frontend: req.Engine.Frontend, + SalvageRequested: req.Engine.SalvageRequested, } case types.InstanceTypeReplica: spdkInstanceSpec = &rpc.SpdkInstanceSpec{ diff --git a/pkg/instance/instance.go b/pkg/instance/instance.go index a22d65647..a534ed39c 100644 --- a/pkg/instance/instance.go +++ b/pkg/instance/instance.go @@ -151,7 +151,7 @@ func (ops V2DataEngineInstanceOps) InstanceCreate(req *rpc.InstanceCreateRequest switch req.Spec.Type { case types.InstanceTypeEngine: engine, err := c.EngineCreate(req.Spec.Name, req.Spec.VolumeName, req.Spec.SpdkInstanceSpec.Frontend, req.Spec.SpdkInstanceSpec.Size, req.Spec.SpdkInstanceSpec.ReplicaAddressMap, - req.Spec.PortCount, req.Spec.InitiatorAddress, req.Spec.TargetAddress, req.Spec.UpgradeRequired) + req.Spec.PortCount, req.Spec.InitiatorAddress, req.Spec.TargetAddress, req.Spec.UpgradeRequired, req.Spec.SpdkInstanceSpec.SalvageRequested) if err != nil { return nil, err }