diff --git a/pkg/controller/blockdevice/controller.go b/pkg/controller/blockdevice/controller.go index ac2adad3..63c5da0f 100644 --- a/pkg/controller/blockdevice/controller.go +++ b/pkg/controller/blockdevice/controller.go @@ -4,16 +4,10 @@ import ( "context" "errors" "fmt" - "os" - "path/filepath" "reflect" - "slices" - "sync" "time" gocommon "github.com/harvester/go-common" - ghwutil "github.com/jaypipes/ghw/pkg/util" - longhornv1 "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -25,6 +19,7 @@ import ( ctldiskv1 "github.com/harvester/node-disk-manager/pkg/generated/controllers/harvesterhci.io/v1beta1" ctllonghornv1 "github.com/harvester/node-disk-manager/pkg/generated/controllers/longhorn.io/v1beta2" "github.com/harvester/node-disk-manager/pkg/option" + "github.com/harvester/node-disk-manager/pkg/provisioner" "github.com/harvester/node-disk-manager/pkg/utils" ) @@ -32,47 +27,6 @@ const ( blockDeviceHandlerName = "harvester-block-device-handler" ) -// semaphore is a simple semaphore implementation in channel -type semaphore struct { - ch chan struct{} -} - -// newSemaphore creates a new semaphore with the given capacity. -func newSemaphore(n uint) *semaphore { - return &semaphore{ - ch: make(chan struct{}, n), - } -} - -// acquire a semaphore to prevent concurrent update -func (s *semaphore) acquire() bool { - logrus.Debugf("Pre-acquire channel stats: %d/%d", len(s.ch), cap(s.ch)) - select { - case s.ch <- struct{}{}: - return true - default: - // full - return false - } -} - -// release the semaphore -func (s *semaphore) release() bool { - select { - case <-s.ch: - return true - default: - // empty - return false - } -} - -type DiskTags struct { - diskTags map[string][]string - lock *sync.RWMutex - initialized bool -} - type Controller struct { Namespace string NodeName string @@ -85,7 +39,7 @@ type Controller struct { BlockInfo block.Info scanner *Scanner - semaphore *semaphore + semaphore *provisioner.Semaphore } type NeedMountUpdateOP int8 @@ -94,58 +48,13 @@ const ( NeedMountUpdateNoOp NeedMountUpdateOP = 1 << iota NeedMountUpdateMount NeedMountUpdateUnmount - - errorCacheDiskTagsNotInitialized = "CacheDiskTags is not initialized" ) func (f NeedMountUpdateOP) Has(flag NeedMountUpdateOP) bool { return f&flag != 0 } -var CacheDiskTags *DiskTags - -func (d *DiskTags) DeleteDiskTags(dev string) { - d.lock.Lock() - defer d.lock.Unlock() - - delete(d.diskTags, dev) -} - -func (d *DiskTags) UpdateDiskTags(dev string, tags []string) { - d.lock.Lock() - defer d.lock.Unlock() - - d.diskTags[dev] = tags -} - -func (d *DiskTags) UpdateInitialized() { - d.lock.Lock() - defer d.lock.Unlock() - - d.initialized = true -} - -func (d *DiskTags) Initialized() bool { - d.lock.RLock() - defer d.lock.RUnlock() - - return d.initialized -} - -func (d *DiskTags) GetDiskTags(dev string) []string { - d.lock.RLock() - defer d.lock.RUnlock() - - return d.diskTags[dev] -} - -func (d *DiskTags) DevExist(dev string) bool { - d.lock.RLock() - defer d.lock.RUnlock() - - _, found := d.diskTags[dev] - return found -} +var CacheDiskTags *provisioner.DiskTags // Register register the block device CRD controller func Register( @@ -156,11 +65,8 @@ func Register( opt *option.Option, scanner *Scanner, ) error { - CacheDiskTags = &DiskTags{ - diskTags: make(map[string][]string), - lock: &sync.RWMutex{}, - initialized: false, - } + CacheDiskTags = provisioner.NewLonghornDiskTags() + semaphoreObj := provisioner.NewSemaphore(opt.MaxConcurrentOps) controller := &Controller{ Namespace: opt.Namespace, NodeName: opt.NodeName, @@ -170,7 +76,7 @@ func Register( BlockdeviceCache: bds.Cache(), BlockInfo: block, scanner: scanner, - semaphore: newSemaphore(opt.MaxConcurrentOps), + semaphore: semaphoreObj, } if err := scanner.Start(); err != nil { @@ -188,29 +94,27 @@ func Register( return nil } -func (c *Controller) handleDeviceRemove(device *diskv1.BlockDevice) error { - logrus.Infof("Prepare to stop provisioning device %s to node %s", device.Name, c.NodeName) - if err := c.unprovisionDeviceFromNode(device); err != nil { - err := fmt.Errorf("failed to stop provisioning device %s to node %s: %w", device.Name, c.NodeName, err) - logrus.Warnf("Removing disk %v error: %v", device.Name, err) - return err - } - return nil -} - // OnBlockDeviceChange watch the block device CR on change and performing disk operations // like mounting the disks to a desired path via ext4 func (c *Controller) OnBlockDeviceChange(_ string, device *diskv1.BlockDevice) (*diskv1.BlockDevice, error) { - if device == nil || device.DeletionTimestamp != nil || device.Spec.NodeName != c.NodeName { + if canSkipBlockDeviceChange(device, c.NodeName) { return nil, nil } + deviceCpy := device.DeepCopy() + provisionerInst, err := c.generateProvisioner(deviceCpy) + if err != nil { + logrus.Warnf("Failed to generate provisioner for device %s: %v", device.Name, err) + return nil, err + } + // handle remove device no matter inactive or corrupted, we will set `device.Spec.FileSystem.Provisioned` to false - if !device.Spec.FileSystem.Provisioned && device.Status.ProvisionPhase != diskv1.ProvisionPhaseUnprovisioned { - deviceCpy := device.DeepCopy() - if err := c.handleDeviceRemove(deviceCpy); err != nil { - diskv1.DiskAddedToNode.SetError(deviceCpy, "", err) - diskv1.DiskAddedToNode.SetStatusBool(deviceCpy, false) + if needProvisionerUnprovision(device) { + if requeue, err := provisionerInst.UnProvision(); requeue { + if err != nil { + diskv1.DiskAddedToNode.SetError(deviceCpy, "", err) + diskv1.DiskAddedToNode.SetStatusBool(deviceCpy, false) + } c.Blockdevices.EnqueueAfter(c.Namespace, device.Name, jitterEnqueueDelay()) } if !reflect.DeepEqual(device, deviceCpy) { @@ -220,423 +124,113 @@ func (c *Controller) OnBlockDeviceChange(_ string, device *diskv1.BlockDevice) ( } // corrupted device could be skipped if we do not set ForceFormatted or Repaired - if device.Status.State == diskv1.BlockDeviceInactive { - return nil, nil - } - if device.Status.DeviceStatus.FileSystem.Corrupted && !device.Spec.FileSystem.ForceFormatted && !device.Spec.FileSystem.Repaired { + if deviceIsNotActiveOrCorrupted(device) { + logrus.Infof("Skip inactive or corrupted device %s", device.Name) return nil, nil } - if !CacheDiskTags.Initialized() { - return nil, errors.New(errorCacheDiskTagsNotInitialized) - } - - deviceCpy := device.DeepCopy() - devPath, err := resolvePersistentDevPath(device) + devPath, err := provisioner.ResolvePersistentDevPath(device) if err != nil { return nil, err } if devPath == "" { return nil, fmt.Errorf("failed to resolve persistent dev path for block device %s", device.Name) } - filesystem := c.BlockInfo.GetFileSystemInfoByDevPath(devPath) - devPathStatus := convertFSInfoToString(filesystem) - logrus.Debugf("Get filesystem info from device %s, %s", devPath, devPathStatus) - needFormat := deviceCpy.Spec.FileSystem.ForceFormatted && (deviceCpy.Status.DeviceStatus.FileSystem.Corrupted || deviceCpy.Status.DeviceStatus.FileSystem.LastFormattedAt == nil) - if needFormat { - logrus.Infof("Prepare to force format device %s", device.Name) - err := c.forceFormat(deviceCpy, devPath, filesystem) - if err != nil { - err := fmt.Errorf("failed to force format device %s: %s", device.Name, err.Error()) - logrus.Error(err) - diskv1.DeviceFormatting.SetError(deviceCpy, "", err) - diskv1.DeviceFormatting.SetStatusBool(deviceCpy, false) + if formatted, requeue, err := provisionerInst.Format(); !formatted { + if requeue { + c.Blockdevices.EnqueueAfter(c.Namespace, device.Name, jitterEnqueueDelay()) } if !reflect.DeepEqual(device, deviceCpy) { logrus.Debugf("Update block device %s for new formatting state", device.Name) return c.Blockdevices.Update(deviceCpy) } - return device, err - } - if needMountUpdate := needUpdateMountPoint(deviceCpy, filesystem); needMountUpdate != NeedMountUpdateNoOp { - err := c.updateDeviceMount(deviceCpy, devPath, filesystem, needMountUpdate) - if err != nil { - err := fmt.Errorf("failed to update device mount %s: %s", device.Name, err.Error()) - logrus.Error(err) - diskv1.DeviceMounted.SetError(deviceCpy, "", err) - diskv1.DeviceMounted.SetStatusBool(deviceCpy, false) - } - if !reflect.DeepEqual(device, deviceCpy) { - logrus.Debugf("Update block device %s for new formatting and mount state", device.Name) - return c.Blockdevices.Update(deviceCpy) - } return device, err } /* - * We use the needProvision to control first time provision. - * 1. `deviceCpy.Spec.FileSystem.Provisioned` is False. - * 2. updateDeviceStatus() would made `deviceCpy.Spec.FileSystem.Provisioned` be true and trigger Update - * 3. loop back and check `deviceCpy.Spec.FileSystem.Provisioned` again. (Now needProvision is true) - * 4. provision - * - * NOTE: we do not need to provision again for provisioned device so we should do another - * check with `device.Status.ProvisionPhase` + * Spec.Filesystem.Provisioned: What we desired to do + * Status.ProvisionPhase: What we are now + * 1. Spec.Filesystem.Provisioned = true, Status.ProvisionPhase = ProvisionPhaseProvisioned + * -> Already provisioned, do Update() + * 2. Spec.Filesystem.Provisioned = true, Status.ProvisionPhase = ProvisionPhaseUnprovisioned + * -> Provision the device */ - needProvision := deviceCpy.Spec.FileSystem.Provisioned - switch { - case needProvision && device.Status.ProvisionPhase == diskv1.ProvisionPhaseProvisioned: + if needProvisionerUpdate(device, deviceCpy) { logrus.Infof("Prepare to check the new device tags %v with device: %s", deviceCpy.Spec.Tags, device.Name) - DiskTagsSynced := gocommon.SliceContentCmp(deviceCpy.Spec.Tags, CacheDiskTags.GetDiskTags(device.Name)) - DiskTagsOnNodeMissed := func() bool { - node, err := c.NodeCache.Get(c.Namespace, c.NodeName) + if requeue, err := provisionerInst.Update(); requeue { if err != nil { - // dont check, just provision - return true - } - nodeDisk := node.Spec.Disks[device.Name] - for _, tag := range deviceCpy.Spec.Tags { - if !slices.Contains(nodeDisk.Tags, tag) { - return true - } - } - return false - } - if !DiskTagsSynced || (DiskTagsSynced && DiskTagsOnNodeMissed()) { - logrus.Debugf("Prepare to update device %s because the Tags changed, Spec: %v, CacheDiskTags: %v", deviceCpy.Name, deviceCpy.Spec.Tags, CacheDiskTags.GetDiskTags(device.Name)) - if err := c.provisionDeviceToNode(deviceCpy); err != nil { - err := fmt.Errorf("failed to update tags %v with device %s to node %s: %w", deviceCpy.Spec.Tags, device.Name, c.NodeName, err) - logrus.Error(err) - c.Blockdevices.EnqueueAfter(c.Namespace, device.Name, jitterEnqueueDelay()) + err := fmt.Errorf("failed to provision device %s to node %s: %w", device.Name, c.NodeName, err) + diskv1.DiskAddedToNode.SetError(deviceCpy, "", err) + diskv1.DiskAddedToNode.SetStatusBool(deviceCpy, false) } + c.Blockdevices.EnqueueAfter(c.Namespace, device.Name, jitterEnqueueDelay()) } - case needProvision && device.Status.ProvisionPhase == diskv1.ProvisionPhaseUnprovisioned: + } + + if needProvisionerProvision(device, deviceCpy) { logrus.Infof("Prepare to provision device %s to node %s", device.Name, c.NodeName) - if err := c.provisionDeviceToNode(deviceCpy); err != nil { - err := fmt.Errorf("failed to provision device %s to node %s: %w", device.Name, c.NodeName, err) - logrus.Error(err) - diskv1.DiskAddedToNode.SetError(deviceCpy, "", err) - diskv1.DiskAddedToNode.SetStatusBool(deviceCpy, false) + if requeue, err := provisionerInst.Provision(); requeue { + if err != nil { + err := fmt.Errorf("failed to provision device %s to node %s: %w", device.Name, c.NodeName, err) + diskv1.DiskAddedToNode.SetError(deviceCpy, "", err) + diskv1.DiskAddedToNode.SetStatusBool(deviceCpy, false) + } c.Blockdevices.EnqueueAfter(c.Namespace, device.Name, jitterEnqueueDelay()) + } } - if !reflect.DeepEqual(device, deviceCpy) { - logrus.Debugf("Update block device %s for new provision state", device.Name) - return c.Blockdevices.Update(deviceCpy) + return c.finalizeBlockDevice(device, deviceCpy, devPath) +} + +func (c *Controller) finalizeBlockDevice(oldBd, newBd *diskv1.BlockDevice, devPath string) (*diskv1.BlockDevice, error) { + if !reflect.DeepEqual(oldBd, newBd) { + logrus.Debugf("Update block device %s for new provision state", oldBd.Name) + return c.Blockdevices.Update(newBd) } // None of the above operations have resulted in an update to the device. // We therefore try to update the latest device status from the OS - if err := c.updateDeviceStatus(deviceCpy, devPath); err != nil { + if err := c.updateDeviceStatus(newBd, devPath); err != nil { return nil, err } - if !reflect.DeepEqual(device, deviceCpy) { - logrus.Debugf("Update block device %s for new device status", device.Name) - return c.Blockdevices.Update(deviceCpy) + if !reflect.DeepEqual(oldBd, newBd) { + logrus.Debugf("Update block device %s for new device status", oldBd.Name) + return c.Blockdevices.Update(newBd) } return nil, nil -} -func (c *Controller) updateDeviceMount(device *diskv1.BlockDevice, devPath string, filesystem *block.FileSystemInfo, needMountUpdate NeedMountUpdateOP) error { - logrus.Infof("Prepare to try %s", convertMountStr(needMountUpdate)) - if device.Status.DeviceStatus.Partitioned { - return fmt.Errorf("partitioned device is not supported, please use raw block device instead") - } - if needMountUpdate.Has(NeedMountUpdateUnmount) { - logrus.Infof("Unmount device %s from path %s", device.Name, filesystem.MountPoint) - if err := utils.UmountDisk(filesystem.MountPoint); err != nil { - return err - } - diskv1.DeviceMounted.SetError(device, "", nil) - diskv1.DeviceMounted.SetStatusBool(device, false) - } - if needMountUpdate.Has(NeedMountUpdateMount) { - expectedMountPoint := extraDiskMountPoint(device) - logrus.Infof("Mount deivce %s to %s", device.Name, expectedMountPoint) - if err := utils.MountDisk(devPath, expectedMountPoint); err != nil { - if utils.IsFSCorrupted(err) { - logrus.Errorf("Target device may be corrupted, update FS info.") - device.Status.DeviceStatus.FileSystem.Corrupted = true - device.Spec.FileSystem.Repaired = false - } - return err - } - diskv1.DeviceMounted.SetError(device, "", nil) - diskv1.DeviceMounted.SetStatusBool(device, true) - } - device.Status.DeviceStatus.FileSystem.Corrupted = false - return c.updateDeviceFileSystem(device, devPath) } -func (c *Controller) updateDeviceFileSystem(device *diskv1.BlockDevice, devPath string) error { - if device.Status.DeviceStatus.FileSystem.Corrupted { - // do not need to update other fields, we only need to update the corrupted flag - return nil - } - filesystem := c.BlockInfo.GetFileSystemInfoByDevPath(devPath) - if filesystem == nil { - return fmt.Errorf("failed to get filesystem info from devPath %s", devPath) +func (c *Controller) generateProvisioner(device *diskv1.BlockDevice) (provisioner.Provisioner, error) { + provisionerType := provisioner.TypeLonghornV1 + if device.Spec.Provisioner != "" { + provisionerType = device.Spec.Provisioner } - if filesystem.MountPoint != "" && filesystem.Type != "" && !utils.IsSupportedFileSystem(filesystem.Type) { - return fmt.Errorf("unsupported filesystem type %s", filesystem.Type) + switch provisionerType { + case provisioner.TypeLonghornV1: + return c.generateLHv1Provisioner(device) + case provisioner.TypeLonghornV2: + return nil, fmt.Errorf("TBD type %s", provisionerType) + case provisioner.TypeLVM: + return nil, fmt.Errorf("TBD type %s", provisionerType) } - - device.Status.DeviceStatus.FileSystem.MountPoint = filesystem.MountPoint - device.Status.DeviceStatus.FileSystem.Type = filesystem.Type - device.Status.DeviceStatus.FileSystem.IsReadOnly = filesystem.IsReadOnly - return nil -} - -func valueExists(value string) bool { - return value != "" && value != ghwutil.UNKNOWN + return nil, fmt.Errorf("unsupported provisioner type %s", provisionerType) } -// forceFormat simply formats the device to ext4 filesystem -// -// - umount the block device if it is mounted -// - create ext4 filesystem on the block device -func (c *Controller) forceFormat(device *diskv1.BlockDevice, devPath string, filesystem *block.FileSystemInfo) error { - if !c.semaphore.acquire() { - logrus.Infof("Hit maximum concurrent count. Requeue device %s", device.Name) - c.Blockdevices.EnqueueAfter(c.Namespace, device.Name, jitterEnqueueDelay()) - return nil - } - - defer c.semaphore.release() - - // umount the disk if it is mounted - if filesystem != nil && filesystem.MountPoint != "" { - logrus.Infof("unmount %s for %s", filesystem.MountPoint, device.Name) - if err := utils.UmountDisk(filesystem.MountPoint); err != nil { - return err - } - } - - // make ext4 filesystem format of the partition disk - logrus.Debugf("make ext4 filesystem format of device %s", device.Name) - // Reuse UUID if possible to make the filesystem UUID more stable. - // - // The reason filesystem UUID needs to be stable is that if a disk - // lacks WWN, NDM then needs a UUID to determine the unique identity - // of the blockdevice CR. - // - // We don't reuse WWN as UUID here because we assume that WWN is - // stable and permanent for a disk. Thefore, even if the underlying - // device gets formatted and the filesystem UUID changes, it still - // won't affect then unique identity of the blockdevice. - var uuid string - if !valueExists(device.Status.DeviceStatus.Details.WWN) { - uuid = device.Status.DeviceStatus.Details.UUID - if !valueExists(uuid) { - uuid = device.Status.DeviceStatus.Details.PtUUID - } - if !valueExists(uuid) { - // Reset the UUID to prevent "unknown" being passed down. - uuid = "" - } - } - if err := utils.MakeExt4DiskFormatting(devPath, uuid); err != nil { - return err - } - - // HACK: Update the UUID if it is reused. - // - // This makes the controller able to find then device after - // a PtUUID is reused in `mkfs.ext4` as filesystem UUID. - // - // If the UUID is not updated within one-stop, the next - // `OnBlockDeviceChange` is not able to find the device - // because `status.DeviceStatus.Details.UUID` is missing. - if uuid != "" { - device.Status.DeviceStatus.Details.UUID = uuid - } - - if err := c.updateDeviceFileSystem(device, devPath); err != nil { - return err - } - diskv1.DeviceFormatting.SetError(device, "", nil) - diskv1.DeviceFormatting.SetStatusBool(device, false) - diskv1.DeviceFormatting.Message(device, "Done device ext4 filesystem formatting") - device.Status.DeviceStatus.FileSystem.LastFormattedAt = &metav1.Time{Time: time.Now()} - device.Status.DeviceStatus.Partitioned = false - device.Status.DeviceStatus.FileSystem.Corrupted = false - return nil -} - -// provisionDeviceToNode adds a device to longhorn node as an additional disk. -func (c *Controller) provisionDeviceToNode(device *diskv1.BlockDevice) error { +func (c *Controller) generateLHv1Provisioner(device *diskv1.BlockDevice) (provisioner.Provisioner, error) { node, err := c.NodeCache.Get(c.Namespace, c.NodeName) if apierrors.IsNotFound(err) { node, err = c.Nodes.Get(c.Namespace, c.NodeName, metav1.GetOptions{}) } if err != nil { - return err - } - - nodeCpy := node.DeepCopy() - diskSpec := longhornv1.DiskSpec{ - Path: extraDiskMountPoint(device), - AllowScheduling: true, - EvictionRequested: false, - StorageReserved: 0, - Tags: device.Spec.Tags, - } - - updated := false - if disk, found := node.Spec.Disks[device.Name]; found { - respectedTags := []string{} - if disk.Tags != nil { - /* we should respect the disk Tags from LH */ - if CacheDiskTags.DevExist(device.Name) { - for _, tag := range disk.Tags { - if !slices.Contains(CacheDiskTags.GetDiskTags(device.Name), tag) { - respectedTags = append(respectedTags, tag) - } - } - } else { - respectedTags = disk.Tags - } - logrus.Debugf("Previous disk tags only on LH: %+v, we should respect it.", respectedTags) - diskSpec.Tags = gocommon.SliceDedupe(append(respectedTags, device.Spec.Tags...)) - updated = reflect.DeepEqual(disk, diskSpec) - } - } - // **NOTE** we do the `DiskAddedToNode` check here if we failed to update the device. - // That means the device status is not `Provisioned` but the LH node already has the disk. - // That we would not do next update, to make the device `Provisioned`. - if !updated || !diskv1.DiskAddedToNode.IsTrue(device) { - // not updated means empty or different, we should update it. - if !updated { - nodeCpy.Spec.Disks[device.Name] = diskSpec - if _, err = c.Nodes.Update(nodeCpy); err != nil { - return err - } - } - - if !diskv1.DiskAddedToNode.IsTrue(device) { - // Update if needed. If the info is alreay there, no need to update. - msg := fmt.Sprintf("Added disk %s to longhorn node `%s` as an additional disk", device.Name, node.Name) - device.Status.ProvisionPhase = diskv1.ProvisionPhaseProvisioned - diskv1.DiskAddedToNode.SetError(device, "", nil) - diskv1.DiskAddedToNode.SetStatusBool(device, true) - diskv1.DiskAddedToNode.Message(device, msg) - } - } - - // update oldDiskTags - CacheDiskTags.UpdateDiskTags(device.Name, device.Spec.Tags) - - return nil -} - -// unprovisionDeviceFromNode removes a device from a longhorn node. -func (c *Controller) unprovisionDeviceFromNode(device *diskv1.BlockDevice) error { - node, err := c.Nodes.Get(c.Namespace, c.NodeName, metav1.GetOptions{}) - if err != nil { - if apierrors.IsNotFound(err) { - // Skip since the node is not there. - return nil - } - return err - } - - updateProvisionPhaseUnprovisioned := func() { - msg := fmt.Sprintf("Disk not in longhorn node `%s`", c.NodeName) - device.Status.ProvisionPhase = diskv1.ProvisionPhaseUnprovisioned - diskv1.DiskAddedToNode.SetError(device, "", nil) - diskv1.DiskAddedToNode.SetStatusBool(device, false) - diskv1.DiskAddedToNode.Message(device, msg) - } - - removeDiskFromNode := func() error { - nodeCpy := node.DeepCopy() - delete(nodeCpy.Spec.Disks, device.Name) - if _, err := c.Nodes.Update(nodeCpy); err != nil { - return err - } - return nil - } - - isValidateToDelete := func(lhDisk longhornv1.DiskSpec) bool { - return !lhDisk.AllowScheduling - } - - diskToRemove, ok := node.Spec.Disks[device.Name] - if !ok { - logrus.Infof("disk %s not in disks of longhorn node %s/%s", device.Name, c.Namespace, c.NodeName) - updateProvisionPhaseUnprovisioned() - return nil - } - - isUnprovisioning := false - for _, tag := range device.Status.Tags { - if tag == utils.DiskRemoveTag { - isUnprovisioning = true - break - } - } - - // for inactive/corrupted disk, we could remove it from node directly - if isUnprovisioning && isValidateToDelete(diskToRemove) && - (device.Status.State == diskv1.BlockDeviceInactive || device.Status.DeviceStatus.FileSystem.Corrupted) { - logrus.Infof("disk (%s) is inactive or corrupted, remove it from node directly", device.Name) - // handle mountpoint first - filesystem := c.BlockInfo.GetFileSystemInfoByDevPath(device.Status.DeviceStatus.DevPath) - if filesystem != nil && filesystem.MountPoint != "" { - timeout := 30 * time.Second - if err := utils.ForceUmountWithTimeout(filesystem.MountPoint, timeout); err != nil { - logrus.Warnf("Force umount %v error: %v", filesystem.MountPoint, err) - } - // reset related fields - c.updateDeviceFileSystem(device, device.Status.DeviceStatus.DevPath) - device.Spec.Tags = []string{} - device.Status.Tags = []string{} - } - // remove the disk from node - if err := removeDiskFromNode(); err != nil { - return err - } - updateProvisionPhaseUnprovisioned() - return nil - } - - if isUnprovisioning { - if status, ok := node.Status.DiskStatus[device.Name]; ok && len(status.ScheduledReplica) == 0 { - // Unprovision finished. Remove the disk. - if err := removeDiskFromNode(); err != nil { - return err - } - updateProvisionPhaseUnprovisioned() - logrus.Debugf("device %s is unprovisioned", device.Name) - } else { - // Still unprovisioning - c.Blockdevices.EnqueueAfter(c.Namespace, device.Name, jitterEnqueueDelay()) - logrus.Debugf("device %s is unprovisioning, status: %+v, ScheduledReplica: %d", device.Name, node.Status.DiskStatus[device.Name], len(status.ScheduledReplica)) - } - } else { - // Start unprovisioing - logrus.Debugf("Setup device %s to start unprovision", device.Name) - diskToRemove.AllowScheduling = false - diskToRemove.EvictionRequested = true - diskToRemove.Tags = append(diskToRemove.Tags, utils.DiskRemoveTag) - nodeCpy := node.DeepCopy() - nodeCpy.Spec.Disks[device.Name] = diskToRemove - if _, err := c.Nodes.Update(nodeCpy); err != nil { - return err - } - msg := fmt.Sprintf("Stop provisioning device %s to longhorn node `%s`", device.Name, c.NodeName) - device.Status.ProvisionPhase = diskv1.ProvisionPhaseUnprovisioning - diskv1.DiskAddedToNode.SetError(device, "", nil) - diskv1.DiskAddedToNode.SetStatusBool(device, false) - diskv1.DiskAddedToNode.Message(device, msg) + return nil, err } - - return nil + return provisioner.NewLHV1Provisioner(device, c.BlockInfo, node, c.Nodes, c.NodeCache, CacheDiskTags, c.semaphore) } func (c *Controller) updateDeviceStatus(device *diskv1.BlockDevice, devPath string) error { @@ -689,7 +283,7 @@ func (c *Controller) updateDeviceStatus(device *diskv1.BlockDevice, devPath stri func (c *Controller) OnBlockDeviceDelete(_ string, device *diskv1.BlockDevice) (*diskv1.BlockDevice, error) { if !CacheDiskTags.Initialized() { - return nil, errors.New(errorCacheDiskTagsNotInitialized) + return nil, errors.New(provisioner.ErrorCacheDiskTagsNotInitialized) } if device == nil { @@ -747,85 +341,6 @@ func (c *Controller) OnBlockDeviceDelete(_ string, device *diskv1.BlockDevice) ( return nil, nil } -func resolvePersistentDevPath(device *diskv1.BlockDevice) (string, error) { - switch device.Status.DeviceStatus.Details.DeviceType { - case diskv1.DeviceTypeDisk: - // Disk naming priority. - // #1 WWN - // #2 filesystem UUID (UUID) - // #3 partition table UUID (PTUUID) - // #4 PtUUID as UUID to query disk info - // (NDM might reuse PtUUID as UUID to format a disk) - if wwn := device.Status.DeviceStatus.Details.WWN; valueExists(wwn) { - if device.Status.DeviceStatus.Details.StorageController == string(diskv1.StorageControllerNVMe) { - return filepath.EvalSymlinks("/dev/disk/by-id/nvme-" + wwn) - } - return filepath.EvalSymlinks("/dev/disk/by-id/wwn-" + wwn) - } - if fsUUID := device.Status.DeviceStatus.Details.UUID; valueExists(fsUUID) { - path, err := filepath.EvalSymlinks("/dev/disk/by-uuid/" + fsUUID) - if err == nil { - return path, nil - } - if !errors.Is(err, os.ErrNotExist) { - return "", err - } - } - - if ptUUID := device.Status.DeviceStatus.Details.PtUUID; valueExists(ptUUID) { - path, err := block.GetDevPathByPTUUID(ptUUID) - if err != nil { - return "", err - } - if path != "" { - return path, nil - } - return filepath.EvalSymlinks("/dev/disk/by-uuid/" + ptUUID) - } - return "", fmt.Errorf("WWN/UUID/PTUUID was not found on device %s", device.Name) - case diskv1.DeviceTypePart: - partUUID := device.Status.DeviceStatus.Details.PartUUID - if partUUID == "" { - return "", fmt.Errorf("PARTUUID was not found on device %s", device.Name) - } - return filepath.EvalSymlinks("/dev/disk/by-partuuid/" + partUUID) - default: - return "", nil - } -} - -func extraDiskMountPoint(bd *diskv1.BlockDevice) string { - // DEPRECATED: only for backward compatibility - if bd.Spec.FileSystem.MountPoint != "" { - return bd.Spec.FileSystem.MountPoint - } - - return fmt.Sprintf("/var/lib/harvester/extra-disks/%s", bd.Name) -} - -func needUpdateMountPoint(bd *diskv1.BlockDevice, filesystem *block.FileSystemInfo) NeedMountUpdateOP { - if filesystem == nil { - logrus.Debugf("Filesystem is not ready, skip the mount operation") - return NeedMountUpdateNoOp - } - - logrus.Debugf("Checking mount operation with FS.Provisioned %v, FS.Mountpoint %s", bd.Spec.FileSystem.Provisioned, filesystem.MountPoint) - if bd.Spec.FileSystem.Provisioned { - if filesystem.MountPoint == "" { - return NeedMountUpdateMount - } - if filesystem.MountPoint == extraDiskMountPoint(bd) { - logrus.Debugf("Already mounted, return no-op") - return NeedMountUpdateNoOp - } - return NeedMountUpdateUnmount | NeedMountUpdateMount - } - if filesystem.MountPoint != "" { - return NeedMountUpdateUnmount - } - return NeedMountUpdateNoOp -} - // jitterEnqueueDelay returns a random duration between 3 to 7. func jitterEnqueueDelay() time.Duration { enqueueDelay := 5 @@ -836,32 +351,23 @@ func jitterEnqueueDelay() time.Duration { return time.Duration(int(randNum)+enqueueDelay) * time.Second } -func convertMountStr(mountOP NeedMountUpdateOP) string { - switch mountOP { - case NeedMountUpdateNoOp: - return "No-Op" - case NeedMountUpdateMount: - return "Mount" - case NeedMountUpdateUnmount: - return "Unmount" - } - return "Unknown OP" +func deviceIsNotActiveOrCorrupted(device *diskv1.BlockDevice) bool { + return device.Status.State == diskv1.BlockDeviceInactive || + (device.Status.DeviceStatus.FileSystem.Corrupted && !device.Spec.FileSystem.ForceFormatted && !device.Spec.FileSystem.Repaired) } -func convertFSInfoToString(fsInfo *block.FileSystemInfo) string { - // means this device is not mounted - if fsInfo.MountPoint == "" { - return "device is not mounted" - } - return fmt.Sprintf("mountpoint: %s, fsType: %s", fsInfo.MountPoint, fsInfo.Type) +func canSkipBlockDeviceChange(device *diskv1.BlockDevice, nodeName string) bool { + return device == nil || device.DeletionTimestamp != nil || device.Spec.NodeName != nodeName } -func removeUnNeeded[T string | int](x []T, y []T) []T { - result := make([]T, 0) - for _, item := range x { - if !slices.Contains(y, item) { - result = append(result, item) - } - } - return result +func needProvisionerUnprovision(device *diskv1.BlockDevice) bool { + return !device.Spec.FileSystem.Provisioned && device.Status.ProvisionPhase != diskv1.ProvisionPhaseUnprovisioned +} + +func needProvisionerUpdate(oldBd, newBd *diskv1.BlockDevice) bool { + return oldBd.Status.ProvisionPhase == diskv1.ProvisionPhaseProvisioned && newBd.Spec.FileSystem.Provisioned +} + +func needProvisionerProvision(oldBd, newBd *diskv1.BlockDevice) bool { + return oldBd.Status.ProvisionPhase == diskv1.ProvisionPhaseUnprovisioned && newBd.Spec.FileSystem.Provisioned } diff --git a/pkg/provisioner/common.go b/pkg/provisioner/common.go new file mode 100644 index 00000000..e7fbd830 --- /dev/null +++ b/pkg/provisioner/common.go @@ -0,0 +1,219 @@ +package provisioner + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "sync" + + ghwutil "github.com/jaypipes/ghw/pkg/util" + "github.com/sirupsen/logrus" + + diskv1 "github.com/harvester/node-disk-manager/pkg/apis/harvesterhci.io/v1beta1" + "github.com/harvester/node-disk-manager/pkg/block" +) + +type NeedMountUpdateOP int8 + +const ( + TypeLonghornV1 = "Longhornv1" + TypeLonghornV2 = "Longhornv2" + TypeLVM = "LVM" + + // longhorn disk tags + ErrorCacheDiskTagsNotInitialized = "CacheDiskTags is not initialized" + + // longhorn MountStatus + NeedMountUpdateNoOp NeedMountUpdateOP = 1 << iota + NeedMountUpdateMount + NeedMountUpdateUnmount +) + +func (f NeedMountUpdateOP) Has(flag NeedMountUpdateOP) bool { + return f&flag != 0 +} + +type Provisioner interface { + Format() (bool, bool, error) + UnFormat() (bool, error) + Provision() (bool, error) + UnProvision() (bool, error) + Update() (bool, error) + GetProvisionerName() string +} + +func setCondDiskAddedToNodeFalse(device *diskv1.BlockDevice, message string, targetStatus diskv1.BlockDeviceProvisionPhase) { + device.Status.ProvisionPhase = targetStatus + diskv1.DiskAddedToNode.SetError(device, "", nil) + diskv1.DiskAddedToNode.SetStatusBool(device, false) + diskv1.DiskAddedToNode.Message(device, message) +} + +func setCondDiskAddedToNodeTrue(device *diskv1.BlockDevice, message string, targetStatus diskv1.BlockDeviceProvisionPhase) { + device.Status.ProvisionPhase = targetStatus + diskv1.DiskAddedToNode.SetError(device, "", nil) + diskv1.DiskAddedToNode.SetStatusBool(device, true) + diskv1.DiskAddedToNode.Message(device, message) +} + +func SetCondDeviceFormattingFail(device *diskv1.BlockDevice, err error) { + diskv1.DeviceFormatting.SetError(device, "", err) + diskv1.DeviceFormatting.SetStatusBool(device, false) +} + +// DiskTags is a cache mechanism for the blockdevices Tags (spec.Tags), it only changed from Harvester side. +type DiskTags struct { + diskTags map[string][]string + lock *sync.RWMutex + initialized bool +} + +func NewLonghornDiskTags() *DiskTags { + return &DiskTags{ + diskTags: make(map[string][]string), + lock: &sync.RWMutex{}, + initialized: false, + } +} + +func (d *DiskTags) DeleteDiskTags(dev string) { + d.lock.Lock() + defer d.lock.Unlock() + + delete(d.diskTags, dev) +} + +func (d *DiskTags) UpdateDiskTags(dev string, tags []string) { + d.lock.Lock() + defer d.lock.Unlock() + + d.diskTags[dev] = tags +} + +func (d *DiskTags) UpdateInitialized() { + d.lock.Lock() + defer d.lock.Unlock() + + d.initialized = true +} + +func (d *DiskTags) Initialized() bool { + d.lock.RLock() + defer d.lock.RUnlock() + + return d.initialized +} + +func (d *DiskTags) GetDiskTags(dev string) []string { + d.lock.RLock() + defer d.lock.RUnlock() + + return d.diskTags[dev] +} + +func (d *DiskTags) DevExist(dev string) bool { + d.lock.RLock() + defer d.lock.RUnlock() + + _, found := d.diskTags[dev] + return found +} + +// semaphore is a simple semaphore implementation in channel +type Semaphore struct { + ch chan struct{} +} + +// newSemaphore creates a new semaphore with the given capacity. +func NewSemaphore(n uint) *Semaphore { + return &Semaphore{ + ch: make(chan struct{}, n), + } +} + +// acquire a semaphore to prevent concurrent update +func (s *Semaphore) acquire() bool { + logrus.Debugf("Pre-acquire channel stats: %d/%d", len(s.ch), cap(s.ch)) + select { + case s.ch <- struct{}{}: + return true + default: + // full + return false + } +} + +// release the semaphore +func (s *Semaphore) release() bool { + select { + case <-s.ch: + return true + default: + // empty + return false + } +} + +func valueExists(value string) bool { + return value != "" && value != ghwutil.UNKNOWN +} + +func convertMountStr(mountOP NeedMountUpdateOP) string { + switch mountOP { + case NeedMountUpdateNoOp: + return "No-Op" + case NeedMountUpdateMount: + return "Mount" + case NeedMountUpdateUnmount: + return "Unmount" + } + return "Unknown OP" +} + +func ResolvePersistentDevPath(device *diskv1.BlockDevice) (string, error) { + switch device.Status.DeviceStatus.Details.DeviceType { + case diskv1.DeviceTypeDisk: + // Disk naming priority. + // #1 WWN + // #2 filesystem UUID (UUID) + // #3 partition table UUID (PTUUID) + // #4 PtUUID as UUID to query disk info + // (NDM might reuse PtUUID as UUID to format a disk) + if wwn := device.Status.DeviceStatus.Details.WWN; valueExists(wwn) { + if device.Status.DeviceStatus.Details.StorageController == string(diskv1.StorageControllerNVMe) { + return filepath.EvalSymlinks("/dev/disk/by-id/nvme-" + wwn) + } + return filepath.EvalSymlinks("/dev/disk/by-id/wwn-" + wwn) + } + if fsUUID := device.Status.DeviceStatus.Details.UUID; valueExists(fsUUID) { + path, err := filepath.EvalSymlinks("/dev/disk/by-uuid/" + fsUUID) + if err == nil { + return path, nil + } + if !errors.Is(err, os.ErrNotExist) { + return "", err + } + } + + if ptUUID := device.Status.DeviceStatus.Details.PtUUID; valueExists(ptUUID) { + path, err := block.GetDevPathByPTUUID(ptUUID) + if err != nil { + return "", err + } + if path != "" { + return path, nil + } + return filepath.EvalSymlinks("/dev/disk/by-uuid/" + ptUUID) + } + return "", fmt.Errorf("WWN/UUID/PTUUID was not found on device %s", device.Name) + case diskv1.DeviceTypePart: + partUUID := device.Status.DeviceStatus.Details.PartUUID + if partUUID == "" { + return "", fmt.Errorf("PARTUUID was not found on device %s", device.Name) + } + return filepath.EvalSymlinks("/dev/disk/by-partuuid/" + partUUID) + default: + return "", nil + } +} diff --git a/pkg/provisioner/longhornv1.go b/pkg/provisioner/longhornv1.go new file mode 100644 index 00000000..c31a4721 --- /dev/null +++ b/pkg/provisioner/longhornv1.go @@ -0,0 +1,463 @@ +package provisioner + +import ( + "errors" + "fmt" + "reflect" + "slices" + "time" + + gocommon "github.com/harvester/go-common" + longhornv1 "github.com/longhorn/longhorn-manager/k8s/pkg/apis/longhorn/v1beta2" + "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + diskv1 "github.com/harvester/node-disk-manager/pkg/apis/harvesterhci.io/v1beta1" + "github.com/harvester/node-disk-manager/pkg/block" + ctllonghornv1 "github.com/harvester/node-disk-manager/pkg/generated/controllers/longhorn.io/v1beta2" + "github.com/harvester/node-disk-manager/pkg/utils" +) + +type LonghornV1Provisioner struct { + name string + blockInfo block.Info + device *diskv1.BlockDevice + nodeObj *longhornv1.Node + nodesClientCache ctllonghornv1.NodeCache + nodesClient ctllonghornv1.NodeClient + + cacheDiskTags *DiskTags + semaphoreObj *Semaphore +} + +func NewLHV1Provisioner( + device *diskv1.BlockDevice, + block block.Info, + nodeObj *longhornv1.Node, + nodesClient ctllonghornv1.NodeClient, + nodesClientCache ctllonghornv1.NodeCache, + cacheDiskTags *DiskTags, + semaphore *Semaphore, +) (*LonghornV1Provisioner, error) { + provisioner := &LonghornV1Provisioner{ + name: TypeLonghornV1, + device: device, + blockInfo: block, + nodeObj: nodeObj, + nodesClient: nodesClient, + nodesClientCache: nodesClientCache, + cacheDiskTags: cacheDiskTags, + semaphoreObj: semaphore, + } + + if !cacheDiskTags.Initialized() { + return nil, errors.New(ErrorCacheDiskTagsNotInitialized) + } + return provisioner, nil +} + +func (p *LonghornV1Provisioner) GetProvisionerName() string { + return p.name +} + +func (p *LonghornV1Provisioner) Provision() (bool, error) { + logrus.Infof("%s provisioning Longhorn block device %s", p.name, p.device.Name) + + nodeObjCpy := p.nodeObj.DeepCopy() + tags := []string{} + if p.device.Spec.Tags != nil { + tags = p.device.Spec.Tags + } + diskSpec := longhornv1.DiskSpec{ + Type: longhornv1.DiskTypeFilesystem, + Path: extraDiskMountPoint(p.device), + AllowScheduling: true, + EvictionRequested: false, + StorageReserved: 0, + Tags: tags, + } + + // checked the case that longhorn node updated but blockdevice CRD is not updated + synced := false + provisioned := false + if disk, found := p.nodeObj.Spec.Disks[p.device.Name]; found { + synced = reflect.DeepEqual(disk, diskSpec) + if !synced { + logrus.Warnf("The disk spec should not different between longhorn node and blockdevice CRD, disk: %+v, diskSpec: %+v", disk, diskSpec) + } + } + if !synced { + logrus.Debugf("Try to sync disk %s to longhorn node %s/%s again", p.device.Name, p.nodeObj.Namespace, p.nodeObj.Name) + nodeObjCpy.Spec.Disks[p.device.Name] = diskSpec + if _, err := p.nodesClient.Update(nodeObjCpy); err != nil { + return true, err + } + provisioned = true + } + + if (synced && !diskv1.DiskAddedToNode.IsTrue(p.device)) || provisioned { + logrus.Debugf("Set blockdevice CRD (%v) to provisioned", p.device) + msg := fmt.Sprintf("Added disk %s to longhorn node `%s` as an additional disk", p.device.Name, p.nodeObj.Name) + setCondDiskAddedToNodeTrue(p.device, msg, diskv1.ProvisionPhaseProvisioned) + } + + p.cacheDiskTags.UpdateDiskTags(p.device.Name, p.device.Spec.Tags) + return false, nil +} + +func (p *LonghornV1Provisioner) UnProvision() (bool, error) { + logrus.Infof("%s unprovisioning Longhorn block device %s", p.name, p.device.Name) + + // inner functions + updateProvisionPhaseUnprovisioned := func() { + msg := fmt.Sprintf("Disk not in longhorn node `%s`", p.nodeObj.Name) + setCondDiskAddedToNodeFalse(p.device, msg, diskv1.ProvisionPhaseUnprovisioned) + } + + removeDiskFromNode := func() error { + nodeCpy := p.nodeObj.DeepCopy() + delete(nodeCpy.Spec.Disks, p.device.Name) + if _, err := p.nodesClient.Update(nodeCpy); err != nil { + return err + } + return nil + } + + isValidateToDelete := func(lhDisk longhornv1.DiskSpec) bool { + return !lhDisk.AllowScheduling + } + + diskToRemove, ok := p.nodeObj.Spec.Disks[p.device.Name] + if !ok { + logrus.Infof("disk %s not in disks of longhorn node %s/%s", p.device.Name, p.nodeObj.Namespace, p.nodeObj.Name) + updateProvisionPhaseUnprovisioned() + return false, nil + } + + isUnprovisioning := false + for _, tag := range p.device.Status.Tags { + if tag == utils.DiskRemoveTag { + isUnprovisioning = true + break + } + } + + // for inactive/corrupted disk, we could remove it from node directly + if isUnprovisioning && isValidateToDelete(diskToRemove) && + (p.device.Status.State == diskv1.BlockDeviceInactive || p.device.Status.DeviceStatus.FileSystem.Corrupted) { + logrus.Infof("disk (%s) is inactive or corrupted, remove it from node directly", p.device.Name) + p.unmountTheBrokenDisk() + + if err := removeDiskFromNode(); err != nil { + return true, err + } + updateProvisionPhaseUnprovisioned() + return false, nil + } + + if isUnprovisioning { + if status, ok := p.nodeObj.Status.DiskStatus[p.device.Name]; ok && len(status.ScheduledReplica) == 0 { + // Unprovision finished. Remove the disk. + if err := removeDiskFromNode(); err != nil { + return true, err + } + updateProvisionPhaseUnprovisioned() + logrus.Debugf("device %s is unprovisioned", p.device.Name) + } else { + // Still unprovisioning + logrus.Debugf("device %s is unprovisioning, status: %+v, ScheduledReplica: %d", p.device.Name, p.nodeObj.Status.DiskStatus[p.device.Name], len(status.ScheduledReplica)) + return true, nil + } + } else { + // Start unprovisioing + if err := p.excludeTheDisk(diskToRemove); err != nil { + return true, err + } + msg := fmt.Sprintf("Stop provisioning device %s to longhorn node `%s`", p.device.Name, p.nodeObj.Name) + setCondDiskAddedToNodeFalse(p.device, msg, diskv1.ProvisionPhaseUnprovisioning) + } + + return false, nil + +} + +func (p *LonghornV1Provisioner) unmountTheBrokenDisk() { + filesystem := p.blockInfo.GetFileSystemInfoByDevPath(p.device.Status.DeviceStatus.DevPath) + if filesystem != nil && filesystem.MountPoint != "" { + if err := utils.ForceUmountWithTimeout(filesystem.MountPoint, 30*time.Second); err != nil { + logrus.Warnf("Force umount %v error: %v", filesystem.MountPoint, err) + } + // reset related fields + p.updateDeviceFileSystem(p.device, p.device.Status.DeviceStatus.DevPath) + p.device.Spec.Tags = []string{} + p.device.Status.Tags = []string{} + } +} + +func (p *LonghornV1Provisioner) excludeTheDisk(targetDisk longhornv1.DiskSpec) error { + logrus.Debugf("Setup device %s to start unprovision", p.device.Name) + targetDisk.AllowScheduling = false + targetDisk.EvictionRequested = true + targetDisk.Tags = append(targetDisk.Tags, utils.DiskRemoveTag) + nodeCpy := p.nodeObj.DeepCopy() + nodeCpy.Spec.Disks[p.device.Name] = targetDisk + if _, err := p.nodesClient.Update(nodeCpy); err != nil { + return err + } + return nil +} + +// Update is used to update the disk tags +func (p *LonghornV1Provisioner) Update() (bool, error) { + + DiskTagsOnNodeMissed := func(targetDisk longhornv1.DiskSpec) bool { + for _, tag := range p.device.Spec.Tags { + if !slices.Contains(targetDisk.Tags, tag) { + return true + } + } + return false + } + + logrus.Infof("%s updating Longhorn block device %s", p.name, p.device.Name) + targetDisk, found := p.nodeObj.Spec.Disks[p.device.Name] + if !found { + logrus.Warnf("disk %s not in disks of longhorn node, was it already provisioned?", p.device.Name) + return false, nil + } + DiskTagsSynced := gocommon.SliceContentCmp(p.device.Spec.Tags, p.cacheDiskTags.GetDiskTags(p.device.Name)) + if !DiskTagsSynced || (DiskTagsSynced && DiskTagsOnNodeMissed(targetDisk)) { + // The final tags: DiskSpec.Tags - DiskCacheTags + Device.Spec.Tags + logrus.Debugf("Prepare to update device %s because the Tags changed, Spec: %v, CacheDiskTags: %v", p.device.Name, p.device.Spec.Tags, p.cacheDiskTags.GetDiskTags(p.device.Name)) + respectedTags := []string{} + for _, tag := range targetDisk.Tags { + if !slices.Contains(p.cacheDiskTags.GetDiskTags(p.device.Name), tag) { + respectedTags = append(respectedTags, tag) + } + } + targetDisk.Tags = gocommon.SliceDedupe(append(respectedTags, p.device.Spec.Tags...)) + nodeCpy := p.nodeObj.DeepCopy() + nodeCpy.Spec.Disks[p.device.Name] = targetDisk + if _, err := p.nodesClient.Update(nodeCpy); err != nil { + return true, err + } + } + p.cacheDiskTags.UpdateDiskTags(p.device.Name, p.device.Spec.Tags) + return false, nil +} + +func (p *LonghornV1Provisioner) Format() (bool, bool, error) { + logrus.Infof("%s formatting Longhorn block device %s", p.name, p.device.Name) + formatted := false + requeue := false + + devPath, err := ResolvePersistentDevPath(p.device) + if err != nil { + return formatted, requeue, err + } + if devPath == "" { + return formatted, requeue, fmt.Errorf("failed to resolve persistent dev path for block device %s", p.device.Name) + } + filesystem := p.blockInfo.GetFileSystemInfoByDevPath(devPath) + devPathStatus := convertFSInfoToString(filesystem) + logrus.Debugf("Get filesystem info from device %s, %s", devPath, devPathStatus) + if p.needFormat() { + logrus.Infof("Prepare to force format device %s", p.device.Name) + requeue, err = p.forceFormatFS(p.device, devPath, filesystem) + if err != nil { + err := fmt.Errorf("failed to force format device %s: %s", p.device.Name, err.Error()) + diskv1.DeviceFormatting.SetError(p.device, "", err) + diskv1.DeviceFormatting.SetStatusBool(p.device, false) + return formatted, requeue, err + } + return formatted, requeue, nil + } + + if needMountUpdate := needUpdateMountPoint(p.device, filesystem); needMountUpdate != NeedMountUpdateNoOp { + err := p.updateDeviceMount(p.device, devPath, filesystem, needMountUpdate) + if err != nil { + err := fmt.Errorf("failed to update device mount %s: %s", p.device.Name, err.Error()) + diskv1.DeviceMounted.SetError(p.device, "", err) + diskv1.DeviceMounted.SetStatusBool(p.device, false) + } + return formatted, requeue, err + } + formatted = true + return true, false, nil +} + +func (p *LonghornV1Provisioner) UnFormat() (bool, error) { + logrus.Infof("%s unformatting Longhorn block device %s", p.name, p.device.Name) + return false, nil +} + +func (p *LonghornV1Provisioner) updateDeviceMount(device *diskv1.BlockDevice, devPath string, filesystem *block.FileSystemInfo, needMountUpdate NeedMountUpdateOP) error { + logrus.Infof("Prepare to try %s", convertMountStr(needMountUpdate)) + if device.Status.DeviceStatus.Partitioned { + return fmt.Errorf("partitioned device is not supported, please use raw block device instead") + } + if needMountUpdate.Has(NeedMountUpdateUnmount) { + logrus.Infof("Unmount device %s from path %s", device.Name, filesystem.MountPoint) + if err := utils.UmountDisk(filesystem.MountPoint); err != nil { + return err + } + diskv1.DeviceMounted.SetError(device, "", nil) + diskv1.DeviceMounted.SetStatusBool(device, false) + } + if needMountUpdate.Has(NeedMountUpdateMount) { + expectedMountPoint := extraDiskMountPoint(device) + logrus.Infof("Mount deivce %s to %s", device.Name, expectedMountPoint) + if err := utils.MountDisk(devPath, expectedMountPoint); err != nil { + if utils.IsFSCorrupted(err) { + logrus.Errorf("Target device may be corrupted, update FS info.") + device.Status.DeviceStatus.FileSystem.Corrupted = true + device.Spec.FileSystem.Repaired = false + } + return err + } + diskv1.DeviceMounted.SetError(device, "", nil) + diskv1.DeviceMounted.SetStatusBool(device, true) + } + device.Status.DeviceStatus.FileSystem.Corrupted = false + return p.updateDeviceFileSystem(device, devPath) +} + +func (p *LonghornV1Provisioner) needFormat() bool { + return p.device.Spec.FileSystem.ForceFormatted && + (p.device.Status.DeviceStatus.FileSystem.Corrupted || p.device.Status.DeviceStatus.FileSystem.LastFormattedAt == nil) +} + +// forceFormat simply formats the device to ext4 filesystem +// +// - umount the block device if it is mounted +// - create ext4 filesystem on the block device +func (p *LonghornV1Provisioner) forceFormatFS(device *diskv1.BlockDevice, devPath string, filesystem *block.FileSystemInfo) (bool, error) { + if !p.semaphoreObj.acquire() { + logrus.Infof("Hit maximum concurrent count. Requeue device %s", device.Name) + return true, nil + } + + defer p.semaphoreObj.release() + + // before format, we need to unmount the device if it is mounted + if filesystem != nil && filesystem.MountPoint != "" { + logrus.Infof("unmount %s for %s", filesystem.MountPoint, device.Name) + if err := utils.UmountDisk(filesystem.MountPoint); err != nil { + return false, err + } + } + + // ***TODO***: we should let people to use ext4 or xfs, but now... + // make ext4 filesystem format of the partition disk + logrus.Debugf("make ext4 filesystem format of device %s", device.Name) + + // Reuse UUID if possible to make the filesystem UUID more stable. + // + // **NOTE**: We should highly depends on the WWN, the filesystem UUID + // is not stable because it store in the filesystem. + // + // The reason filesystem UUID needs to be stable is that if a disk + // lacks WWN, NDM then needs a UUID to determine the unique identity + // of the blockdevice CR. + // + // We don't reuse WWN as UUID here because we assume that WWN is + // stable and permanent for a disk. Thefore, even if the underlying + // device gets formatted and the filesystem UUID changes, it still + // won't affect then unique identity of the blockdevice. + var uuid string + if !valueExists(device.Status.DeviceStatus.Details.WWN) { + uuid = device.Status.DeviceStatus.Details.UUID + if !valueExists(uuid) { + uuid = device.Status.DeviceStatus.Details.PtUUID + } + if !valueExists(uuid) { + // Reset the UUID to prevent "unknown" being passed down. + uuid = "" + } + } + if err := utils.MakeExt4DiskFormatting(devPath, uuid); err != nil { + return false, err + } + + // HACK: Update the UUID if it is reused. + // + // This makes the controller able to find then device after + // a PtUUID is reused in `mkfs.ext4` as filesystem UUID. + // + // If the UUID is not updated within one-stop, the next + // `OnBlockDeviceChange` is not able to find the device + // because `status.DeviceStatus.Details.UUID` is missing. + if uuid != "" { + device.Status.DeviceStatus.Details.UUID = uuid + } + + if err := p.updateDeviceFileSystem(device, devPath); err != nil { + return false, err + } + diskv1.DeviceFormatting.SetError(device, "", nil) + diskv1.DeviceFormatting.SetStatusBool(device, false) + diskv1.DeviceFormatting.Message(device, "Done device ext4 filesystem formatting") + device.Status.DeviceStatus.FileSystem.LastFormattedAt = &metav1.Time{Time: time.Now()} + device.Status.DeviceStatus.Partitioned = false + device.Status.DeviceStatus.FileSystem.Corrupted = false + return false, nil +} + +func (p *LonghornV1Provisioner) updateDeviceFileSystem(device *diskv1.BlockDevice, devPath string) error { + if device.Status.DeviceStatus.FileSystem.Corrupted { + // do not need to update other fields, we only need to update the corrupted flag + return nil + } + filesystem := p.blockInfo.GetFileSystemInfoByDevPath(devPath) + if filesystem == nil { + return fmt.Errorf("failed to get filesystem info from devPath %s", devPath) + } + if filesystem.MountPoint != "" && filesystem.Type != "" && !utils.IsSupportedFileSystem(filesystem.Type) { + return fmt.Errorf("unsupported filesystem type %s", filesystem.Type) + } + + device.Status.DeviceStatus.FileSystem.MountPoint = filesystem.MountPoint + device.Status.DeviceStatus.FileSystem.Type = filesystem.Type + device.Status.DeviceStatus.FileSystem.IsReadOnly = filesystem.IsReadOnly + return nil +} + +func extraDiskMountPoint(bd *diskv1.BlockDevice) string { + // DEPRECATED: only for backward compatibility + if bd.Spec.FileSystem.MountPoint != "" { + return bd.Spec.FileSystem.MountPoint + } + + return fmt.Sprintf("/var/lib/harvester/extra-disks/%s", bd.Name) +} + +func convertFSInfoToString(fsInfo *block.FileSystemInfo) string { + // means this device is not mounted + if fsInfo.MountPoint == "" { + return "device is not mounted" + } + return fmt.Sprintf("mountpoint: %s, fsType: %s", fsInfo.MountPoint, fsInfo.Type) +} + +func needUpdateMountPoint(bd *diskv1.BlockDevice, filesystem *block.FileSystemInfo) NeedMountUpdateOP { + if filesystem == nil { + logrus.Debugf("Filesystem is not ready, skip the mount operation") + return NeedMountUpdateNoOp + } + + logrus.Debugf("Checking mount operation with FS.Provisioned %v, FS.Mountpoint %s", bd.Spec.FileSystem.Provisioned, filesystem.MountPoint) + if bd.Spec.FileSystem.Provisioned { + if filesystem.MountPoint == "" { + return NeedMountUpdateMount + } + if filesystem.MountPoint == extraDiskMountPoint(bd) { + logrus.Debugf("Already mounted, return no-op") + return NeedMountUpdateNoOp + } + return NeedMountUpdateUnmount | NeedMountUpdateMount + } + if filesystem.MountPoint != "" { + return NeedMountUpdateUnmount + } + return NeedMountUpdateNoOp +}