Skip to content

Commit

Permalink
feat(initiator): support suspension and target switchover
Browse files Browse the repository at this point in the history
Longhorn 6001

Signed-off-by: Derek Su <derek.su@suse.com>
  • Loading branch information
derekbit committed Jun 30, 2024
1 parent ece200a commit 6797616
Show file tree
Hide file tree
Showing 4 changed files with 304 additions and 39 deletions.
2 changes: 1 addition & 1 deletion app/cmd/nvmecli/nvmecli.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func start(c *cli.Context) error {
return err
}

if _, err := initiator.Start(c.String("traddr"), c.String("trsvcid"), true, false); err != nil {
if _, err := initiator.Start(c.String("traddr"), c.String("trsvcid"), true); err != nil {
return err
}

Expand Down
226 changes: 189 additions & 37 deletions pkg/nvme/initiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ const (
LockFile = "/var/run/longhorn-spdk.lock"
LockTimeout = 120 * time.Second

RetryCounts = 5
RetryInterval = 3 * time.Second
maxNumRetries = 15
retryInterval = 1 * time.Second

waitDeviceTimeout = 60 * time.Second
maxNumWaitDeviceRetries = 60
waitDeviceInterval = 1 * time.Second

HostProc = "/host/proc"

Expand All @@ -51,6 +52,7 @@ type Initiator struct {
logger logrus.FieldLogger
}

// NewInitiator creates a new NVMe initiator
func NewInitiator(name, subsystemNQN, hostProc string) (*Initiator, error) {
if name == "" || subsystemNQN == "" {
return nil, fmt.Errorf("empty name or subsystem for initiator creation")
Expand Down Expand Up @@ -78,6 +80,87 @@ func NewInitiator(name, subsystemNQN, hostProc string) (*Initiator, error) {
}, nil
}

// DiscoverTarget discovers a target
func (i *Initiator) DiscoverTarget(ip, port string) (string, error) {
if i.hostProc != "" {
lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout)
if err := lock.Lock(); err != nil {
return "", errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name)
}
defer lock.Unlock()

Check warning on line 90 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L84-L90

Added lines #L84 - L90 were not covered by tests
}

return DiscoverTarget(ip, port, i.executor)

Check warning on line 93 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L93

Added line #L93 was not covered by tests
}

// ConnectTarget connects to a target
func (i *Initiator) ConnectTarget(ip, port, nqn string) (string, error) {
if i.hostProc != "" {
lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout)
if err := lock.Lock(); err != nil {
return "", errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name)
}
defer lock.Unlock()

Check warning on line 103 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L97-L103

Added lines #L97 - L103 were not covered by tests
}

return ConnectTarget(ip, port, nqn, i.executor)

Check warning on line 106 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L106

Added line #L106 was not covered by tests
}

// DisconnectTarget disconnects a target
func (i *Initiator) DisconnectTarget() error {
if i.hostProc != "" {
lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout)
if err := lock.Lock(); err != nil {
return errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name)
}
defer lock.Unlock()

Check warning on line 116 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L110-L116

Added lines #L110 - L116 were not covered by tests
}

return DisconnectTarget(i.SubsystemNQN, i.executor)

Check warning on line 119 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L119

Added line #L119 was not covered by tests
}

// WaitForConnect waits for the NVMe initiator to connect
func (i *Initiator) WaitForConnect(maxNumRetries int, retryInterval time.Duration) (err error) {
if i.hostProc != "" {
lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout)
if err := lock.Lock(); err != nil {
return errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name)
}
defer lock.Unlock()

Check warning on line 129 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L123-L129

Added lines #L123 - L129 were not covered by tests
}

for r := 0; r < maxNumRetries; r++ {
err = i.loadNVMeDeviceInfoWithoutLock()
if err == nil {
return nil
}
time.Sleep(retryInterval)

Check warning on line 137 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L132-L137

Added lines #L132 - L137 were not covered by tests
}

return err

Check warning on line 140 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L140

Added line #L140 was not covered by tests
}

// WaitForDisconnect waits for the NVMe initiator to disconnect
func (i *Initiator) WaitForDisconnect(maxNumRetries int, retryInterval time.Duration) (err error) {
if i.hostProc != "" {
lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout)
if err := lock.Lock(); err != nil {
return errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name)
}
defer lock.Unlock()

Check warning on line 150 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L144-L150

Added lines #L144 - L150 were not covered by tests
}

for r := 0; r < maxNumRetries; r++ {
err = i.loadNVMeDeviceInfoWithoutLock()
if strings.Contains(err.Error(), "cannot find a valid nvme device") {
return nil
}
time.Sleep(retryInterval)

Check warning on line 158 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L153-L158

Added lines #L153 - L158 were not covered by tests
}

return err

Check warning on line 161 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L161

Added line #L161 was not covered by tests
}

// Suspend suspends the device mapper device for the NVMe initiator
func (i *Initiator) Suspend(noflush, nolockfs bool) error {
if i.hostProc != "" {
Expand All @@ -88,16 +171,53 @@ func (i *Initiator) Suspend(noflush, nolockfs bool) error {
defer lock.Unlock()
}

if err := i.suspendLinearDmDevice(noflush, nolockfs); err != nil {
return errors.Wrapf(err, "failed to suspend device mapper device for NVMe initiator %s", i.Name)
suspended, err := i.IsSuspended()
if err != nil {
return errors.Wrapf(err, "failed to check if linear dm device is suspended for NVMe initiator %s", i.Name)
}

Check warning on line 177 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L176-L177

Added lines #L176 - L177 were not covered by tests

if !suspended {
if err := i.suspendLinearDmDevice(noflush, nolockfs); err != nil {
return errors.Wrapf(err, "failed to suspend device mapper device for NVMe initiator %s", i.Name)
}

Check warning on line 182 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L181-L182

Added lines #L181 - L182 were not covered by tests
}

return nil
}

// Resume resumes the device mapper device for the NVMe initiator
func (i *Initiator) Resume() error {
if i.hostProc != "" {
lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout)
if err := lock.Lock(); err != nil {
return errors.Wrapf(err, "failed to get file lock for initiator %s", i.Name)
}

Check warning on line 194 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L193-L194

Added lines #L193 - L194 were not covered by tests
defer lock.Unlock()
}

if err := i.resumeLinearDmDevice(); err != nil {
return errors.Wrapf(err, "failed to resume device mapper device for NVMe initiator %s", i.Name)

Check warning on line 199 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L199

Added line #L199 was not covered by tests
}

return nil
}

func (i *Initiator) resumeLinearDmDevice() error {
logrus.Infof("Resuming linear dm device %s", i.Name)

return util.DmsetupResume(i.Name, i.executor)
}

func (i *Initiator) replaceDmDeviceTarget() error {
if err := i.suspendLinearDmDevice(true, false); err != nil {
return errors.Wrapf(err, "failed to suspend linear dm device for NVMe initiator %s", i.Name)
suspended, err := i.IsSuspended()
if err != nil {
return errors.Wrapf(err, "failed to check if linear dm device is suspended for NVMe initiator %s", i.Name)
}

Check warning on line 215 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L212-L215

Added lines #L212 - L215 were not covered by tests

if !suspended {
if err := i.suspendLinearDmDevice(true, false); err != nil {
return errors.Wrapf(err, "failed to suspend linear dm device for NVMe initiator %s", i.Name)
}

Check warning on line 220 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L217-L220

Added lines #L217 - L220 were not covered by tests
}

if err := i.reloadLinearDmDevice(); err != nil {
Expand All @@ -111,13 +231,21 @@ func (i *Initiator) replaceDmDeviceTarget() error {
}

// Start starts the NVMe initiator with the given transportAddress and transportServiceID
func (i *Initiator) Start(transportAddress, transportServiceID string, needDmDeviceCleanup bool) (dmDeviceBusy bool, err error) {
func (i *Initiator) Start(transportAddress, transportServiceID string, dmDeviceCleanupRequired bool) (dmDeviceBusy bool, err error) {
defer func() {
if err != nil {
err = errors.Wrapf(err, "failed to start NVMe initiator %s", i.Name)
}
}()

i.logger.WithFields(logrus.Fields{
"transportAddress": transportAddress,
"transportServiceID": transportServiceID,
"dmDeviceCleanupRequired": dmDeviceCleanupRequired,
})

i.logger.Info("Starting initiator")

if transportAddress == "" || transportServiceID == "" {
return false, fmt.Errorf("invalid TransportAddress %s and TransportServiceID %s for initiator %s start", transportAddress, transportServiceID, i.Name)
}
Expand All @@ -133,10 +261,6 @@ func (i *Initiator) Start(transportAddress, transportServiceID string, needDmDev
// Check if the initiator/NVMe device is already launched and matches the params
if err := i.loadNVMeDeviceInfoWithoutLock(); err == nil {
if i.TransportAddress == transportAddress && i.TransportServiceID == transportServiceID {
i.logger.WithFields(logrus.Fields{
"transportAddress": transportAddress,
"transportServiceID": transportServiceID,
})
if err = i.LoadEndpoint(false); err == nil {
i.logger.Info("NVMe initiator is already launched with correct params")
return false, nil
Expand All @@ -149,7 +273,7 @@ func (i *Initiator) Start(transportAddress, transportServiceID string, needDmDev
}

i.logger.Infof("Stopping NVMe initiator blindly before starting")
dmDeviceBusy, err = i.stopWithoutLock(needDmDeviceCleanup, false, false)
dmDeviceBusy, err = i.stopWithoutLock(dmDeviceCleanupRequired, false, false)
if err != nil {
return dmDeviceBusy, errors.Wrapf(err, "failed to stop the mismatching NVMe initiator %s before starting", i.Name)
}
Expand All @@ -161,42 +285,48 @@ func (i *Initiator) Start(transportAddress, transportServiceID string, needDmDev
i.logger.Infof("Launching NVMe initiator")

// Setup initiator
for counter := 0; counter < RetryCounts; counter++ {
for r := 0; r < maxNumRetries; r++ {
// Rerun this API for a discovered target should be fine
if i.SubsystemNQN, err = DiscoverTarget(transportAddress, transportServiceID, i.executor); err != nil {
i.logger.WithError(err).Warnf("Failed to discover")
time.Sleep(RetryInterval)
subsystemNQN, err := DiscoverTarget(transportAddress, transportServiceID, i.executor)
if err != nil {
i.logger.WithError(err).Warn("Failed to discover target")
time.Sleep(retryInterval)

Check warning on line 293 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L292-L293

Added lines #L292 - L293 were not covered by tests
continue
}

if i.ControllerName, err = ConnectTarget(transportAddress, transportServiceID, i.SubsystemNQN, i.executor); err != nil {
i.logger.WithError(err).Warnf("Failed to connect target")
time.Sleep(RetryInterval)
controllerName, err := ConnectTarget(transportAddress, transportServiceID, subsystemNQN, i.executor)
if err != nil {
i.logger.WithError(err).Warn("Failed to connect target")
time.Sleep(retryInterval)

Check warning on line 300 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L299-L300

Added lines #L299 - L300 were not covered by tests
continue
}

i.SubsystemNQN = subsystemNQN
i.ControllerName = controllerName
break
}

if i.ControllerName == "" {
return dmDeviceBusy, fmt.Errorf("failed to start NVMe initiator %s within %d * %v sec retries", i.Name, RetryCounts, RetryInterval.Seconds())
return dmDeviceBusy, fmt.Errorf("failed to start NVMe initiator %s within %d * %v sec retries", i.Name, maxNumRetries, retryInterval.Seconds())

Check warning on line 310 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L310

Added line #L310 was not covered by tests
}

for t := 0; t < int(waitDeviceTimeout.Seconds()); t++ {
if err = i.loadNVMeDeviceInfoWithoutLock(); err == nil {
for r := 0; r < maxNumWaitDeviceRetries; r++ {
err = i.loadNVMeDeviceInfoWithoutLock()
if err == nil {
break
}
time.Sleep(time.Second)
time.Sleep(waitDeviceInterval)

Check warning on line 318 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L318

Added line #L318 was not covered by tests
}
if err != nil {
return dmDeviceBusy, errors.Wrapf(err, "failed to load device info after starting NVMe initiator %s", i.Name)
}

needMakeEndpoint := true
if needDmDeviceCleanup {
if dmDeviceCleanupRequired {
if dmDeviceBusy {
// Endpoint is already created, just replace the target device
needMakeEndpoint = false
i.logger.Infof("Linear dm device %s is busy, trying the best to replace the target device for NVMe initiator %s", i.Name, i.Name)
i.logger.Infof("Linear dm device is busy, trying the best to replace the target device for NVMe initiator %s", i.Name)

Check warning on line 329 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L329

Added line #L329 was not covered by tests
if err := i.replaceDmDeviceTarget(); err != nil {
i.logger.WithError(err).Warnf("Failed to replace the target device for NVMe initiator %s", i.Name)
} else {
Expand All @@ -206,7 +336,7 @@ func (i *Initiator) Start(transportAddress, transportServiceID string, needDmDev
} else {
i.logger.Infof("Creating linear dm device for NVMe initiator %s", i.Name)
if err := i.createLinearDmDevice(); err != nil {
return dmDeviceBusy, errors.Wrapf(err, "failed to create linear dm device for NVMe initiator %s", i.Name)
return false, errors.Wrapf(err, "failed to create linear dm device for NVMe initiator %s", i.Name)

Check warning on line 339 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L339

Added line #L339 was not covered by tests
}
}
} else {
Expand All @@ -226,7 +356,7 @@ func (i *Initiator) Start(transportAddress, transportServiceID string, needDmDev
return dmDeviceBusy, nil
}

func (i *Initiator) Stop(needDmDeviceCleanup, deferDmDeviceCleanup, errOnBusyDmDevice bool) (bool, error) {
func (i *Initiator) Stop(dmDeviceCleanupRequired, deferDmDeviceCleanup, errOnBusyDmDevice bool) (bool, error) {
if i.hostProc != "" {
lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout)
if err := lock.Lock(); err != nil {
Expand All @@ -235,7 +365,7 @@ func (i *Initiator) Stop(needDmDeviceCleanup, deferDmDeviceCleanup, errOnBusyDmD
defer lock.Unlock()
}

return i.stopWithoutLock(needDmDeviceCleanup, deferDmDeviceCleanup, errOnBusyDmDevice)
return i.stopWithoutLock(dmDeviceCleanupRequired, deferDmDeviceCleanup, errOnBusyDmDevice)
}

func (i *Initiator) removeDmDeviceAndEndpoint(deferDmDeviceCleanup, errOnBusyDmDevice bool) (bool, error) {
Expand All @@ -254,9 +384,9 @@ func (i *Initiator) removeDmDeviceAndEndpoint(deferDmDeviceCleanup, errOnBusyDmD
return false, nil
}

func (i *Initiator) stopWithoutLock(needDmDeviceCleanup, deferDmDeviceCleanup, errOnBusyDmDevice bool) (bool, error) {
func (i *Initiator) stopWithoutLock(dmDeviceCleanupRequired, deferDmDeviceCleanup, errOnBusyDmDevice bool) (bool, error) {
dmDeviceBusy := false
if needDmDeviceCleanup {
if dmDeviceCleanupRequired {
var err error
dmDeviceBusy, err = i.removeDmDeviceAndEndpoint(deferDmDeviceCleanup, errOnBusyDmDevice)
if err != nil {
Expand Down Expand Up @@ -376,10 +506,10 @@ func (i *Initiator) LoadEndpoint(dmDeviceBusy bool) error {
}

if dmDeviceBusy {
i.logger.Debugf("Skipping endpoint %v loading for NVMe initiator %s due to device busy", i.Endpoint, i.Name)
i.logger.Debugf("Skipping endpoint %v loading for NVMe initiator %v due to device busy", i.Endpoint, i.Name)

Check warning on line 509 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L509

Added line #L509 was not covered by tests
} else {
if i.NamespaceName != "" && !i.isNamespaceExist(depDevices) {
return fmt.Errorf("detected device %s name mismatching from endpoint %v for NVMe initiator %s", dev.Nvme.Name, i.Endpoint, i.Name)
return fmt.Errorf("detected device %s name mismatching from endpoint %v for NVMe initiator %s", dev.Name, i.Endpoint, i.Name)

Check warning on line 512 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L512

Added line #L512 was not covered by tests
}
}

Expand Down Expand Up @@ -476,10 +606,32 @@ func (i *Initiator) suspendLinearDmDevice(noflush, nolockfs bool) error {
return util.DmsetupSuspend(i.Name, noflush, nolockfs, i.executor)
}

func (i *Initiator) resumeLinearDmDevice() error {
logrus.Infof("Resuming linear dm device %s", i.Name)
// ReloadDmDevice reloads the linear dm device
func (i *Initiator) ReloadDmDevice() (err error) {
if i.hostProc != "" {
lock := nsfilelock.NewLockWithTimeout(util.GetHostNamespacePath(i.hostProc), LockFile, LockTimeout)
if err := lock.Lock(); err != nil {
return errors.Wrapf(err, "failed to get file lock for NVMe initiator %s", i.Name)
}
defer lock.Unlock()

Check warning on line 616 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L610-L616

Added lines #L610 - L616 were not covered by tests
}

return util.DmsetupResume(i.Name, i.executor)
return i.reloadLinearDmDevice()

Check warning on line 619 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L619

Added line #L619 was not covered by tests
}

// IsSuspended checks if the linear dm device is suspended
func (i *Initiator) IsSuspended() (bool, error) {
devices, err := util.DmsetupInfo(i.Name, i.executor)
if err != nil {
return false, err
}

Check warning on line 627 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L626-L627

Added lines #L626 - L627 were not covered by tests

for _, device := range devices {
if device.Name == i.Name {
return device.Suspended, nil
}
}
return false, fmt.Errorf("failed to find linear dm device %s", i.Name)

Check warning on line 634 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L634

Added line #L634 was not covered by tests
}

func (i *Initiator) reloadLinearDmDevice() error {
Expand All @@ -500,7 +652,7 @@ func (i *Initiator) reloadLinearDmDevice() error {

table := fmt.Sprintf("0 %v linear %v 0", sectors, devPath)

logrus.Infof("Reloading linear dm device %s with table %s", i.Name, table)
logrus.Infof("Reloading linear dm device %s with table '%s'", i.Name, table)

Check warning on line 655 in pkg/nvme/initiator.go

View check run for this annotation

Codecov / codecov/patch

pkg/nvme/initiator.go#L655

Added line #L655 was not covered by tests

return util.DmsetupReload(i.Name, table, i.executor)
}
Expand Down
Loading

0 comments on commit 6797616

Please sign in to comment.