From adcba0272891c221caf9a5d398d9db23540e24dc Mon Sep 17 00:00:00 2001 From: Derek Su Date: Sun, 23 Jun 2024 12:28:18 +0000 Subject: [PATCH] feat(initiator): support suspension and target switchover Longhorn 6001 Signed-off-by: Derek Su --- app/cmd/nvmecli/nvmecli.go | 2 +- pkg/nvme/initiator.go | 226 +++++++++++++++++++++++++++++++------ pkg/spdk/spdk_test.go | 113 +++++++++++++++++++ pkg/util/device.go | 2 +- 4 files changed, 304 insertions(+), 39 deletions(-) diff --git a/app/cmd/nvmecli/nvmecli.go b/app/cmd/nvmecli/nvmecli.go index fd92820..d152140 100644 --- a/app/cmd/nvmecli/nvmecli.go +++ b/app/cmd/nvmecli/nvmecli.go @@ -221,7 +221,7 @@ func start(c *cli.Context) error { return err } - if _, err := initiator.Start(c.String("traddr"), c.String("trsvcid"), true, false); err != nil { + if _, err := initiator.Start(c.String("traddr"), c.String("trsvcid"), true); err != nil { return err } diff --git a/pkg/nvme/initiator.go b/pkg/nvme/initiator.go index 412592b..2e6a373 100644 --- a/pkg/nvme/initiator.go +++ b/pkg/nvme/initiator.go @@ -22,10 +22,11 @@ const ( LockFile = "/var/run/longhorn-spdk.lock" LockTimeout = 120 * time.Second - RetryCounts = 5 - RetryInterval = 3 * time.Second + maxNumRetries = 15 + retryInterval = 1 * time.Second - waitDeviceTimeout = 60 * time.Second + maxNumWaitDeviceRetries = 60 + waitDeviceInterval = 1 * time.Second HostProc = "/host/proc" @@ -51,6 +52,7 @@ type Initiator struct { logger logrus.FieldLogger } +// NewInitiator creates a new NVMe initiator func NewInitiator(name, subsystemNQN, hostProc string) (*Initiator, error) { if name == "" || subsystemNQN == "" { return nil, fmt.Errorf("empty name or subsystem for initiator creation") @@ -78,6 +80,87 @@ func NewInitiator(name, subsystemNQN, hostProc string) (*Initiator, error) { }, nil } +// DiscoverTarget discovers a target +func (i *Initiator) DiscoverTarget(ip, port string) (string, error) { + if i.hostProc != "" { + lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout) + if err := lock.Lock(); err != nil { + return "", errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name) + } + defer lock.Unlock() + } + + return DiscoverTarget(ip, port, i.executor) +} + +// ConnectTarget connects to a target +func (i *Initiator) ConnectTarget(ip, port, nqn string) (string, error) { + if i.hostProc != "" { + lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout) + if err := lock.Lock(); err != nil { + return "", errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name) + } + defer lock.Unlock() + } + + return ConnectTarget(ip, port, nqn, i.executor) +} + +// DisconnectTarget disconnects a target +func (i *Initiator) DisconnectTarget() error { + if i.hostProc != "" { + lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout) + if err := lock.Lock(); err != nil { + return errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name) + } + defer lock.Unlock() + } + + return DisconnectTarget(i.SubsystemNQN, i.executor) +} + +// WaitForConnect waits for the NVMe initiator to connect +func (i *Initiator) WaitForConnect(maxNumRetries int, retryInterval time.Duration) (err error) { + if i.hostProc != "" { + lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout) + if err := lock.Lock(); err != nil { + return errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name) + } + defer lock.Unlock() + } + + for r := 0; r < maxNumRetries; r++ { + err = i.loadNVMeDeviceInfoWithoutLock() + if err == nil { + return nil + } + time.Sleep(retryInterval) + } + + return err +} + +// WaitForDisconnect waits for the NVMe initiator to disconnect +func (i *Initiator) WaitForDisconnect(maxNumRetries int, retryInterval time.Duration) (err error) { + if i.hostProc != "" { + lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout) + if err := lock.Lock(); err != nil { + return errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name) + } + defer lock.Unlock() + } + + for r := 0; r < maxNumRetries; r++ { + err = i.loadNVMeDeviceInfoWithoutLock() + if strings.Contains(err.Error(), "cannot find a valid nvme device") { + return nil + } + time.Sleep(retryInterval) + } + + return err +} + // Suspend suspends the device mapper device for the NVMe initiator func (i *Initiator) Suspend(noflush, nolockfs bool) error { if i.hostProc != "" { @@ -88,16 +171,53 @@ func (i *Initiator) Suspend(noflush, nolockfs bool) error { defer lock.Unlock() } - if err := i.suspendLinearDmDevice(noflush, nolockfs); err != nil { - return errors.Wrapf(err, "failed to suspend device mapper device for NVMe initiator %s", i.Name) + suspended, err := i.IsSuspended() + if err != nil { + return errors.Wrapf(err, "failed to check if linear dm device is suspended for NVMe initiator %s", i.Name) + } + + if !suspended { + if err := i.suspendLinearDmDevice(noflush, nolockfs); err != nil { + return errors.Wrapf(err, "failed to suspend device mapper device for NVMe initiator %s", i.Name) + } + } + + return nil +} + +// Resume resumes the device mapper device for the NVMe initiator +func (i *Initiator) Resume() error { + if i.hostProc != "" { + lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout) + if err := lock.Lock(); err != nil { + return errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name) + } + defer lock.Unlock() + } + + if err := i.resumeLinearDmDevice(); err != nil { + return errors.Wrapf(err, "failed to resume device mapper device for NVMe initiator %s", i.Name) } return nil } +func (i *Initiator) resumeLinearDmDevice() error { + logrus.Infof("Resuming linear dm device %s", i.Name) + + return util.DmsetupResume(i.Name, i.executor) +} + func (i *Initiator) replaceDmDeviceTarget() error { - if err := i.suspendLinearDmDevice(true, false); err != nil { - return errors.Wrapf(err, "failed to suspend linear dm device for NVMe initiator %s", i.Name) + suspended, err := i.IsSuspended() + if err != nil { + return errors.Wrapf(err, "failed to check if linear dm device is suspended for NVMe initiator %s", i.Name) + } + + if !suspended { + if err := i.suspendLinearDmDevice(true, false); err != nil { + return errors.Wrapf(err, "failed to suspend linear dm device for NVMe initiator %s", i.Name) + } } if err := i.reloadLinearDmDevice(); err != nil { @@ -111,13 +231,21 @@ func (i *Initiator) replaceDmDeviceTarget() error { } // Start starts the NVMe initiator with the given transportAddress and transportServiceID -func (i *Initiator) Start(transportAddress, transportServiceID string, needDmDeviceCleanup bool) (dmDeviceBusy bool, err error) { +func (i *Initiator) Start(transportAddress, transportServiceID string, dmDeviceCleanupRequired bool) (dmDeviceBusy bool, err error) { defer func() { if err != nil { err = errors.Wrapf(err, "failed to start NVMe initiator %s", i.Name) } }() + i.logger.WithFields(logrus.Fields{ + "transportAddress": transportAddress, + "transportServiceID": transportServiceID, + "dmDeviceCleanupRequired": dmDeviceCleanupRequired, + }) + + i.logger.Info("Starting initiator") + if transportAddress == "" || transportServiceID == "" { return false, fmt.Errorf("invalid TransportAddress %s and TransportServiceID %s for initiator %s start", transportAddress, transportServiceID, i.Name) } @@ -133,10 +261,6 @@ func (i *Initiator) Start(transportAddress, transportServiceID string, needDmDev // Check if the initiator/NVMe device is already launched and matches the params if err := i.loadNVMeDeviceInfoWithoutLock(); err == nil { if i.TransportAddress == transportAddress && i.TransportServiceID == transportServiceID { - i.logger.WithFields(logrus.Fields{ - "transportAddress": transportAddress, - "transportServiceID": transportServiceID, - }) if err = i.LoadEndpoint(false); err == nil { i.logger.Info("NVMe initiator is already launched with correct params") return false, nil @@ -149,7 +273,7 @@ func (i *Initiator) Start(transportAddress, transportServiceID string, needDmDev } i.logger.Infof("Stopping NVMe initiator blindly before starting") - dmDeviceBusy, err = i.stopWithoutLock(needDmDeviceCleanup, false, false) + dmDeviceBusy, err = i.stopWithoutLock(dmDeviceCleanupRequired, false, false) if err != nil { return dmDeviceBusy, errors.Wrapf(err, "failed to stop the mismatching NVMe initiator %s before starting", i.Name) } @@ -161,42 +285,48 @@ func (i *Initiator) Start(transportAddress, transportServiceID string, needDmDev i.logger.Infof("Launching NVMe initiator") // Setup initiator - for counter := 0; counter < RetryCounts; counter++ { + for r := 0; r < maxNumRetries; r++ { // Rerun this API for a discovered target should be fine - if i.SubsystemNQN, err = DiscoverTarget(transportAddress, transportServiceID, i.executor); err != nil { - i.logger.WithError(err).Warnf("Failed to discover") - time.Sleep(RetryInterval) + subsystemNQN, err := DiscoverTarget(transportAddress, transportServiceID, i.executor) + if err != nil { + i.logger.WithError(err).Warn("Failed to discover target") + time.Sleep(retryInterval) continue } - if i.ControllerName, err = ConnectTarget(transportAddress, transportServiceID, i.SubsystemNQN, i.executor); err != nil { - i.logger.WithError(err).Warnf("Failed to connect target") - time.Sleep(RetryInterval) + controllerName, err := ConnectTarget(transportAddress, transportServiceID, subsystemNQN, i.executor) + if err != nil { + i.logger.WithError(err).Warn("Failed to connect target") + time.Sleep(retryInterval) continue } + + i.SubsystemNQN = subsystemNQN + i.ControllerName = controllerName break } if i.ControllerName == "" { - return dmDeviceBusy, fmt.Errorf("failed to start NVMe initiator %s within %d * %v sec retries", i.Name, RetryCounts, RetryInterval.Seconds()) + return dmDeviceBusy, fmt.Errorf("failed to start NVMe initiator %s within %d * %v sec retries", i.Name, maxNumRetries, retryInterval.Seconds()) } - for t := 0; t < int(waitDeviceTimeout.Seconds()); t++ { - if err = i.loadNVMeDeviceInfoWithoutLock(); err == nil { + for r := 0; r < maxNumWaitDeviceRetries; r++ { + err = i.loadNVMeDeviceInfoWithoutLock() + if err == nil { break } - time.Sleep(time.Second) + time.Sleep(waitDeviceInterval) } if err != nil { return dmDeviceBusy, errors.Wrapf(err, "failed to load device info after starting NVMe initiator %s", i.Name) } needMakeEndpoint := true - if needDmDeviceCleanup { + if dmDeviceCleanupRequired { if dmDeviceBusy { // Endpoint is already created, just replace the target device needMakeEndpoint = false - i.logger.Infof("Linear dm device %s is busy, trying the best to replace the target device for NVMe initiator %s", i.Name, i.Name) + i.logger.Infof("Linear dm device is busy, trying the best to replace the target device for NVMe initiator %s", i.Name) if err := i.replaceDmDeviceTarget(); err != nil { i.logger.WithError(err).Warnf("Failed to replace the target device for NVMe initiator %s", i.Name) } else { @@ -206,7 +336,7 @@ func (i *Initiator) Start(transportAddress, transportServiceID string, needDmDev } else { i.logger.Infof("Creating linear dm device for NVMe initiator %s", i.Name) if err := i.createLinearDmDevice(); err != nil { - return dmDeviceBusy, errors.Wrapf(err, "failed to create linear dm device for NVMe initiator %s", i.Name) + return false, errors.Wrapf(err, "failed to create linear dm device for NVMe initiator %s", i.Name) } } } else { @@ -226,7 +356,7 @@ func (i *Initiator) Start(transportAddress, transportServiceID string, needDmDev return dmDeviceBusy, nil } -func (i *Initiator) Stop(needDmDeviceCleanup, deferDmDeviceCleanup, errOnBusyDmDevice bool) (bool, error) { +func (i *Initiator) Stop(dmDeviceCleanupRequired, deferDmDeviceCleanup, errOnBusyDmDevice bool) (bool, error) { if i.hostProc != "" { lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout) if err := lock.Lock(); err != nil { @@ -235,7 +365,7 @@ func (i *Initiator) Stop(needDmDeviceCleanup, deferDmDeviceCleanup, errOnBusyDmD defer lock.Unlock() } - return i.stopWithoutLock(needDmDeviceCleanup, deferDmDeviceCleanup, errOnBusyDmDevice) + return i.stopWithoutLock(dmDeviceCleanupRequired, deferDmDeviceCleanup, errOnBusyDmDevice) } func (i *Initiator) removeDmDeviceAndEndpoint(deferDmDeviceCleanup, errOnBusyDmDevice bool) (bool, error) { @@ -254,9 +384,9 @@ func (i *Initiator) removeDmDeviceAndEndpoint(deferDmDeviceCleanup, errOnBusyDmD return false, nil } -func (i *Initiator) stopWithoutLock(needDmDeviceCleanup, deferDmDeviceCleanup, errOnBusyDmDevice bool) (bool, error) { +func (i *Initiator) stopWithoutLock(dmDeviceCleanupRequired, deferDmDeviceCleanup, errOnBusyDmDevice bool) (bool, error) { dmDeviceBusy := false - if needDmDeviceCleanup { + if dmDeviceCleanupRequired { var err error dmDeviceBusy, err = i.removeDmDeviceAndEndpoint(deferDmDeviceCleanup, errOnBusyDmDevice) if err != nil { @@ -376,10 +506,10 @@ func (i *Initiator) LoadEndpoint(dmDeviceBusy bool) error { } if dmDeviceBusy { - i.logger.Debugf("Skipping endpoint %v loading for NVMe initiator %s due to device busy", i.Endpoint, i.Name) + i.logger.Debugf("Skipping endpoint %v loading for NVMe initiator %v due to device busy", i.Endpoint, i.Name) } else { if i.NamespaceName != "" && !i.isNamespaceExist(depDevices) { - return fmt.Errorf("detected device %s name mismatching from endpoint %v for NVMe initiator %s", dev.Nvme.Name, i.Endpoint, i.Name) + return fmt.Errorf("detected device %s name mismatching from endpoint %v for NVMe initiator %s", dev.Name, i.Endpoint, i.Name) } } @@ -476,10 +606,32 @@ func (i *Initiator) suspendLinearDmDevice(noflush, nolockfs bool) error { return util.DmsetupSuspend(i.Name, noflush, nolockfs, i.executor) } -func (i *Initiator) resumeLinearDmDevice() error { - logrus.Infof("Resuming linear dm device %s", i.Name) +// ReloadDmDevice reloads the linear dm device +func (i *Initiator) ReloadDmDevice() (err error) { + if i.hostProc != "" { + lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout) + if err := lock.Lock(); err != nil { + return errors.Wrapf(err, "failed to get file lock for NVMe initiator %s", i.Name) + } + defer lock.Unlock() + } - return util.DmsetupResume(i.Name, i.executor) + return i.reloadLinearDmDevice() +} + +// IsSuspended checks if the linear dm device is suspended +func (i *Initiator) IsSuspended() (bool, error) { + devices, err := util.DmsetupInfo(i.Name, i.executor) + if err != nil { + return false, err + } + + for _, device := range devices { + if device.Name == i.Name { + return device.Suspended, nil + } + } + return false, fmt.Errorf("failed to find linear dm device %s", i.Name) } func (i *Initiator) reloadLinearDmDevice() error { @@ -500,7 +652,7 @@ func (i *Initiator) reloadLinearDmDevice() error { table := fmt.Sprintf("0 %v linear %v 0", sectors, devPath) - logrus.Infof("Reloading linear dm device %s with table %s", i.Name, table) + logrus.Infof("Reloading linear dm device %s with table '%s'", i.Name, table) return util.DmsetupReload(i.Name, table, i.executor) } diff --git a/pkg/spdk/spdk_test.go b/pkg/spdk/spdk_test.go index 97ccc5e..376eb0a 100644 --- a/pkg/spdk/spdk_test.go +++ b/pkg/spdk/spdk_test.go @@ -470,3 +470,116 @@ func (s *TestSuite) TestSPDKClientMultiThread(c *C) { wg.Wait() } + +func (s *TestSuite) TestSPDKEngineSuspend(c *C) { + fmt.Println("Testing Engine Suspend") + + ne, err := util.NewExecutor(commonTypes.ProcDirectory) + c.Assert(err, IsNil) + + LaunchTestSPDKTarget(c, ne.Execute) + PrepareDeviceFile(c) + defer func() { + os.RemoveAll(defaultDevicePath) + }() + + spdkCli, err := client.NewClient(context.Background()) + c.Assert(err, IsNil) + + // Do blindly cleanup + err = spdkCli.DeleteDevice(defaultDeviceName, defaultDeviceName) + if err != nil { + c.Assert(jsonrpc.IsJSONRPCRespErrorNoSuchDevice(err), Equals, true) + } + + bdevAioName, lvsName, lvsUUID, err := spdkCli.AddDevice(defaultDevicePath, defaultDeviceName, types.MiB) + c.Assert(err, IsNil) + defer func() { + err := spdkCli.DeleteDevice(bdevAioName, lvsName) + c.Assert(err, IsNil) + }() + c.Assert(bdevAioName, Equals, defaultDeviceName) + c.Assert(lvsName, Equals, defaultDeviceName) + c.Assert(lvsUUID, Not(Equals), "") + + bdevAioInfoList, err := spdkCli.BdevAioGet(bdevAioName, 0) + c.Assert(err, IsNil) + c.Assert(len(bdevAioInfoList), Equals, 1) + bdevAio := bdevAioInfoList[0] + c.Assert(bdevAio.Name, Equals, bdevAioName) + c.Assert(uint64(bdevAio.BlockSize)*bdevAio.NumBlocks, Equals, defaultDeviceSize) + + lvsList, err := spdkCli.BdevLvolGetLvstore(lvsName, "") + c.Assert(err, IsNil) + c.Assert(len(lvsList), Equals, 1) + lvs := lvsList[0] + c.Assert(lvs.UUID, Equals, lvsUUID) + c.Assert(lvs.BaseBdev, Equals, bdevAio.Name) + c.Assert(int(lvs.BlockSize), Equals, int(bdevAio.BlockSize)) + // The meta info may take space + c.Assert(lvs.ClusterSize*(lvs.TotalDataClusters+4), Equals, defaultDeviceSize) + + lvolName1, lvolName2 := "test-lvol1", "test-lvol2" + lvolUUID1, err := spdkCli.BdevLvolCreate(lvsName, "", lvolName1, defaultLvolSizeInMiB, "", true) + c.Assert(err, IsNil) + defer func() { + deleted, err := spdkCli.BdevLvolDelete(lvolUUID1) + c.Assert(err, IsNil) + c.Assert(deleted, Equals, true) + }() + lvolUUID2, err := spdkCli.BdevLvolCreate("", lvsUUID, lvolName2, defaultLvolSizeInMiB, "", true) + c.Assert(err, IsNil) + defer func() { + deleted, err := spdkCli.BdevLvolDelete(lvolUUID2) + c.Assert(err, IsNil) + c.Assert(deleted, Equals, true) + }() + + lvolList, err := spdkCli.BdevLvolGet("", 0) + c.Assert(err, IsNil) + c.Assert(len(lvolList), Equals, 2) + + raidName := "test-raid" + created, err := spdkCli.BdevRaidCreate(raidName, spdktypes.BdevRaidLevelRaid1, 0, []string{lvolUUID1, lvolUUID2}) + c.Assert(err, IsNil) + c.Assert(created, Equals, true) + defer func() { + deleted, err := spdkCli.BdevRaidDelete(raidName) + c.Assert(err, IsNil) + c.Assert(deleted, Equals, true) + }() + + nqn := types.GetNQN(raidName) + err = spdkCli.StartExposeBdev(nqn, raidName, "ABCDEF0123456789ABCDEF0123456789", types.LocalIP, defaultPort1) + c.Assert(err, IsNil) + defer func() { + err = spdkCli.StopExposeBdev(nqn) + c.Assert(err, IsNil) + }() + + initiator, err := nvme.NewInitiator(raidName, nqn, nvme.HostProc) + c.Assert(err, IsNil) + + dmDeviceBusy, err := initiator.Start(types.LocalIP, defaultPort1, true) + c.Assert(dmDeviceBusy, Equals, false) + c.Assert(err, IsNil) + defer func() { + dmDeviceBusy, err = initiator.Stop(true, true, true) + c.Assert(dmDeviceBusy, Equals, false) + c.Assert(err, IsNil) + }() + + err = initiator.Suspend(true, true) + c.Assert(err, IsNil) + + suspended, err := initiator.IsSuspended() + c.Assert(err, IsNil) + c.Assert(suspended, Equals, true) + + err = initiator.LoadEndpoint(dmDeviceBusy) + c.Assert(err, IsNil) + c.Assert(initiator.GetEndpoint(), Equals, "/dev/longhorn/test-raid") + + err = initiator.Resume() + c.Assert(err, IsNil) +} diff --git a/pkg/util/device.go b/pkg/util/device.go index e2c9a69..f55f4fd 100644 --- a/pkg/util/device.go +++ b/pkg/util/device.go @@ -115,7 +115,7 @@ func DetectDevice(path string, executor *commonNs.Executor) (*BlockDevice, error f := strings.Fields(line) if len(f) == 2 { dev = &BlockDevice{ - Name: f[0], + Name: f[0], } _, err = fmt.Sscanf(f[1], "%d:%d", &dev.Major, &dev.Minor) if err != nil {