Skip to content

Commit

Permalink
controller: decompose the LH provisioner
Browse files Browse the repository at this point in the history
Signed-off-by: Vicente Cheng <vicente.cheng@suse.com>
  • Loading branch information
Vicente-Cheng committed Jun 5, 2024
1 parent 0641ea6 commit e65b17e
Show file tree
Hide file tree
Showing 3 changed files with 322 additions and 135 deletions.
182 changes: 47 additions & 135 deletions pkg/controller/blockdevice/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
ctldiskv1 "github.com/harvester/node-disk-manager/pkg/generated/controllers/harvesterhci.io/v1beta1"
ctllonghornv1 "github.com/harvester/node-disk-manager/pkg/generated/controllers/longhorn.io/v1beta2"
"github.com/harvester/node-disk-manager/pkg/option"
"github.com/harvester/node-disk-manager/pkg/provisioner"
"github.com/harvester/node-disk-manager/pkg/utils"
)

Expand Down Expand Up @@ -188,29 +189,23 @@ func Register(
return nil
}

func (c *Controller) handleDeviceRemove(device *diskv1.BlockDevice) error {
logrus.Infof("Prepare to stop provisioning device %s to node %s", device.Name, c.NodeName)
if err := c.unprovisionDeviceFromNode(device); err != nil {
err := fmt.Errorf("failed to stop provisioning device %s to node %s: %w", device.Name, c.NodeName, err)
logrus.Warnf("Removing disk %v error: %v", device.Name, err)
return err
}
return nil
}

// OnBlockDeviceChange watch the block device CR on change and performing disk operations
// like mounting the disks to a desired path via ext4
func (c *Controller) OnBlockDeviceChange(_ string, device *diskv1.BlockDevice) (*diskv1.BlockDevice, error) {
if device == nil || device.DeletionTimestamp != nil || device.Spec.NodeName != c.NodeName {
return nil, nil
}

deviceCpy := device.DeepCopy()
provisioner, _ := c.generateProvisioner(deviceCpy)

// handle remove device no matter inactive or corrupted, we will set `device.Spec.FileSystem.Provisioned` to false
if !device.Spec.FileSystem.Provisioned && device.Status.ProvisionPhase != diskv1.ProvisionPhaseUnprovisioned {
deviceCpy := device.DeepCopy()
if err := c.handleDeviceRemove(deviceCpy); err != nil {
diskv1.DiskAddedToNode.SetError(deviceCpy, "", err)
diskv1.DiskAddedToNode.SetStatusBool(deviceCpy, false)
if requeue, err := provisioner.UnProvision(); requeue {
if err != nil {
diskv1.DiskAddedToNode.SetError(deviceCpy, "", err)
diskv1.DiskAddedToNode.SetStatusBool(deviceCpy, false)
}
c.Blockdevices.EnqueueAfter(c.Namespace, device.Name, jitterEnqueueDelay())
}
if !reflect.DeepEqual(device, deviceCpy) {
Expand All @@ -231,7 +226,6 @@ func (c *Controller) OnBlockDeviceChange(_ string, device *diskv1.BlockDevice) (
return nil, errors.New(errorCacheDiskTagsNotInitialized)
}

deviceCpy := device.DeepCopy()
devPath, err := resolvePersistentDevPath(device)
if err != nil {
return nil, err
Expand Down Expand Up @@ -314,13 +308,23 @@ func (c *Controller) OnBlockDeviceChange(_ string, device *diskv1.BlockDevice) (
}
case needProvision && device.Status.ProvisionPhase == diskv1.ProvisionPhaseUnprovisioned:
logrus.Infof("Prepare to provision device %s to node %s", device.Name, c.NodeName)
if err := c.provisionDeviceToNode(deviceCpy); err != nil {
err := fmt.Errorf("failed to provision device %s to node %s: %w", device.Name, c.NodeName, err)
logrus.Error(err)
diskv1.DiskAddedToNode.SetError(deviceCpy, "", err)
diskv1.DiskAddedToNode.SetStatusBool(deviceCpy, false)
if requeue, err := provisioner.Provision(); requeue {
if err != nil {
err := fmt.Errorf("failed to provision device %s to node %s: %w", device.Name, c.NodeName, err)
diskv1.DiskAddedToNode.SetError(deviceCpy, "", err)
diskv1.DiskAddedToNode.SetStatusBool(deviceCpy, false)
}
c.Blockdevices.EnqueueAfter(c.Namespace, device.Name, jitterEnqueueDelay())

}
//logrus.Infof("Prepare to provision device %s to node %s", device.Name, c.NodeName)
//if err := c.provisionDeviceToNode(deviceCpy); err != nil {
// err := fmt.Errorf("failed to provision device %s to node %s: %w", device.Name, c.NodeName, err)
// logrus.Error(err)
// diskv1.DiskAddedToNode.SetError(deviceCpy, "", err)
// diskv1.DiskAddedToNode.SetStatusBool(deviceCpy, false)
// c.Blockdevices.EnqueueAfter(c.Namespace, device.Name, jitterEnqueueDelay())
//}
}

if !reflect.DeepEqual(device, deviceCpy) {
Expand All @@ -342,6 +346,29 @@ func (c *Controller) OnBlockDeviceChange(_ string, device *diskv1.BlockDevice) (
return nil, nil
}

Check notice on line 348 in pkg/controller/blockdevice/controller.go

View check run for this annotation

codefactor.io / CodeFactor

pkg/controller/blockdevice/controller.go#L194-L348

Complex Method
func (c *Controller) generateProvisioner(device *diskv1.BlockDevice) (provisioner.Provisioner, error) {
provisionerType := provisioner.TypeLonghornV1
if device.Spec.Provisioner != "" {
provisionerType = device.Spec.Provisioner
}
switch provisionerType {

Check notice on line 354 in pkg/controller/blockdevice/controller.go

View check run for this annotation

codefactor.io / CodeFactor

pkg/controller/blockdevice/controller.go#L354

Switch with only one case can be replaced by an if-then. (unnecessary-stmt)
case provisioner.TypeLonghornV1:
return c.generateLHv1Provisioner(device)
}
return nil, fmt.Errorf("unsupported provisioner type %s", provisionerType)
}

func (c *Controller) generateLHv1Provisioner(device *diskv1.BlockDevice) (provisioner.Provisioner, error) {
node, err := c.NodeCache.Get(c.Namespace, c.NodeName)
if apierrors.IsNotFound(err) {
node, err = c.Nodes.Get(c.Namespace, c.NodeName, metav1.GetOptions{})
}
if err != nil {
return nil, err
}
return provisioner.NewLHV1Provisioner(device, c.BlockInfo, node, c.Nodes, c.NodeCache), nil
}

func (c *Controller) updateDeviceMount(device *diskv1.BlockDevice, devPath string, filesystem *block.FileSystemInfo, needMountUpdate NeedMountUpdateOP) error {
logrus.Infof("Prepare to try %s", convertMountStr(needMountUpdate))
if device.Status.DeviceStatus.Partitioned {
Expand Down Expand Up @@ -534,111 +561,6 @@ func (c *Controller) provisionDeviceToNode(device *diskv1.BlockDevice) error {
return nil
}

// unprovisionDeviceFromNode removes a device from a longhorn node.
func (c *Controller) unprovisionDeviceFromNode(device *diskv1.BlockDevice) error {
node, err := c.Nodes.Get(c.Namespace, c.NodeName, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
// Skip since the node is not there.
return nil
}
return err
}

updateProvisionPhaseUnprovisioned := func() {
msg := fmt.Sprintf("Disk not in longhorn node `%s`", c.NodeName)
device.Status.ProvisionPhase = diskv1.ProvisionPhaseUnprovisioned
diskv1.DiskAddedToNode.SetError(device, "", nil)
diskv1.DiskAddedToNode.SetStatusBool(device, false)
diskv1.DiskAddedToNode.Message(device, msg)
}

removeDiskFromNode := func() error {
nodeCpy := node.DeepCopy()
delete(nodeCpy.Spec.Disks, device.Name)
if _, err := c.Nodes.Update(nodeCpy); err != nil {
return err
}
return nil
}

isValidateToDelete := func(lhDisk longhornv1.DiskSpec) bool {
return !lhDisk.AllowScheduling
}

diskToRemove, ok := node.Spec.Disks[device.Name]
if !ok {
logrus.Infof("disk %s not in disks of longhorn node %s/%s", device.Name, c.Namespace, c.NodeName)
updateProvisionPhaseUnprovisioned()
return nil
}

isUnprovisioning := false
for _, tag := range device.Status.Tags {
if tag == utils.DiskRemoveTag {
isUnprovisioning = true
break
}
}

// for inactive/corrupted disk, we could remove it from node directly
if isUnprovisioning && isValidateToDelete(diskToRemove) &&
(device.Status.State == diskv1.BlockDeviceInactive || device.Status.DeviceStatus.FileSystem.Corrupted) {
logrus.Infof("disk (%s) is inactive or corrupted, remove it from node directly", device.Name)
// handle mountpoint first
filesystem := c.BlockInfo.GetFileSystemInfoByDevPath(device.Status.DeviceStatus.DevPath)
if filesystem != nil && filesystem.MountPoint != "" {
timeout := 30 * time.Second
if err := utils.ForceUmountWithTimeout(filesystem.MountPoint, timeout); err != nil {
logrus.Warnf("Force umount %v error: %v", filesystem.MountPoint, err)
}
// reset related fields
c.updateDeviceFileSystem(device, device.Status.DeviceStatus.DevPath)
device.Spec.Tags = []string{}
device.Status.Tags = []string{}
}
// remove the disk from node
if err := removeDiskFromNode(); err != nil {
return err
}
updateProvisionPhaseUnprovisioned()
return nil
}

if isUnprovisioning {
if status, ok := node.Status.DiskStatus[device.Name]; ok && len(status.ScheduledReplica) == 0 {
// Unprovision finished. Remove the disk.
if err := removeDiskFromNode(); err != nil {
return err
}
updateProvisionPhaseUnprovisioned()
logrus.Debugf("device %s is unprovisioned", device.Name)
} else {
// Still unprovisioning
c.Blockdevices.EnqueueAfter(c.Namespace, device.Name, jitterEnqueueDelay())
logrus.Debugf("device %s is unprovisioning, status: %+v, ScheduledReplica: %d", device.Name, node.Status.DiskStatus[device.Name], len(status.ScheduledReplica))
}
} else {
// Start unprovisioing
logrus.Debugf("Setup device %s to start unprovision", device.Name)
diskToRemove.AllowScheduling = false
diskToRemove.EvictionRequested = true
diskToRemove.Tags = append(diskToRemove.Tags, utils.DiskRemoveTag)
nodeCpy := node.DeepCopy()
nodeCpy.Spec.Disks[device.Name] = diskToRemove
if _, err := c.Nodes.Update(nodeCpy); err != nil {
return err
}
msg := fmt.Sprintf("Stop provisioning device %s to longhorn node `%s`", device.Name, c.NodeName)
device.Status.ProvisionPhase = diskv1.ProvisionPhaseUnprovisioning
diskv1.DiskAddedToNode.SetError(device, "", nil)
diskv1.DiskAddedToNode.SetStatusBool(device, false)
diskv1.DiskAddedToNode.Message(device, msg)
}

return nil
}

func (c *Controller) updateDeviceStatus(device *diskv1.BlockDevice, devPath string) error {
var newStatus diskv1.DeviceStatus
var needAutoProvision bool
Expand Down Expand Up @@ -855,13 +777,3 @@ func convertFSInfoToString(fsInfo *block.FileSystemInfo) string {
}
return fmt.Sprintf("mountpoint: %s, fsType: %s", fsInfo.MountPoint, fsInfo.Type)
}

func removeUnNeeded[T string | int](x []T, y []T) []T {
result := make([]T, 0)
for _, item := range x {
if !slices.Contains(y, item) {
result = append(result, item)
}
}
return result
}
34 changes: 34 additions & 0 deletions pkg/provisioner/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package provisioner

import (
diskv1 "github.com/harvester/node-disk-manager/pkg/apis/harvesterhci.io/v1beta1"
)

const (
TypeLonghornV1 = "Longhornv1"
TypeLonghornV2 = "Longhornv2"
TypeLVM = "LVM"
)

type Provisioner interface {
Format() (bool, error)
UnFormat() (bool, error)
Provision() (bool, error)
UnProvision() (bool, error)
Update() (bool, error)
GetProvisionerName() string
}

func setCondDiskAddedToNodeFalse(device *diskv1.BlockDevice, message string, targetStatus diskv1.BlockDeviceProvisionPhase) {
device.Status.ProvisionPhase = targetStatus
diskv1.DiskAddedToNode.SetError(device, "", nil)
diskv1.DiskAddedToNode.SetStatusBool(device, false)
diskv1.DiskAddedToNode.Message(device, message)
}

func setCondDiskAddedToNodeSuccess(device *diskv1.BlockDevice, message string, targetStatus diskv1.BlockDeviceProvisionPhase) {
device.Status.ProvisionPhase = targetStatus
diskv1.DiskAddedToNode.SetError(device, "", nil)
diskv1.DiskAddedToNode.SetStatusBool(device, true)
diskv1.DiskAddedToNode.Message(device, message)
}
Loading

0 comments on commit e65b17e

Please sign in to comment.