Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage-network): annotated pod in creation loop #2456

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions controller/instance_manager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,94 +271,94 @@
)
}

func (imc *InstanceManagerController) syncInstanceManager(key string) (err error) {
defer func() {
err = errors.Wrapf(err, "failed to sync instance manager for %v", key)
}()
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
if namespace != imc.namespace {
return nil
}

im, err := imc.ds.GetInstanceManager(name)
if err != nil {
if datastore.ErrorIsNotFound(err) {
return imc.cleanupInstanceManager(name)
}
return errors.Wrap(err, "failed to get instance manager")
}

log := getLoggerForInstanceManager(imc.logger, im)

if !imc.isResponsibleFor(im) {
return nil
}

if im.Status.OwnerID != imc.controllerID {
im.Status.OwnerID = imc.controllerID
im, err = imc.ds.UpdateInstanceManagerStatus(im)
if err != nil {
// we don't mind others coming first
if apierrors.IsConflict(errors.Cause(err)) {
return nil
}
return err
}
log.Infof("Instance Manager got new owner %v", imc.controllerID)
}

if im.DeletionTimestamp != nil {
return imc.cleanupInstanceManager(im.Name)
}

existingIM := im.DeepCopy()
defer func() {
if err == nil && !reflect.DeepEqual(existingIM.Status, im.Status) {
_, err = imc.ds.UpdateInstanceManagerStatus(im)
}
if apierrors.IsConflict(errors.Cause(err)) {
log.WithError(err).Debugf("Requeue %v due to conflict", key)
imc.enqueueInstanceManager(im)
err = nil
}
}()

if err := imc.syncStatusWithPod(im); err != nil {
return err
}

if err := imc.syncStatusWithNode(im); err != nil {
return err
}

if err := imc.syncInstanceStatus(im); err != nil {
return err
}

if err := imc.handlePod(im); err != nil {
return err
}

if err := imc.syncInstanceManagerPDB(im); err != nil {
return err
}

if err := imc.syncInstanceManagerAPIVersion(im); err != nil {
return err
}

if err := imc.syncMonitor(im); err != nil {
return err
}

return nil
}

// syncStatusWithPod updates the InstanceManager based on the pod current phase only,
// regardless of the InstanceManager previous status.

Check notice on line 361 in controller/instance_manager_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/instance_manager_controller.go#L274-L361

Complex Method
func (imc *InstanceManagerController) syncStatusWithPod(im *longhorn.InstanceManager) error {
log := getLoggerForInstanceManager(imc.logger, im)

Expand Down Expand Up @@ -522,57 +522,57 @@
return nil
}

func (imc *InstanceManagerController) areDangerZoneSettingsSyncedToIMPod(im *longhorn.InstanceManager) (isSynced, isPodDeletedOrNotRunning, areInstancesRunningInPod bool, err error) {
if im.Status.CurrentState != longhorn.InstanceManagerStateRunning {
return false, true, false, nil
}

// nolint:all
for _, instance := range types.ConsolidateInstances(im.Status.InstanceEngines, im.Status.InstanceReplicas, im.Status.Instances) {
if instance.Status.State == longhorn.InstanceStateRunning || instance.Status.State == longhorn.InstanceStateStarting {
return false, false, true, nil
}
}

pod, err := imc.ds.GetPod(im.Name)
pod, err := imc.ds.GetPodRO(im.Namespace, im.Name)
if err != nil {
return false, false, false, errors.Wrapf(err, "cannot get pod for instance manager %v", im.Name)
}
if pod == nil {
return false, true, false, nil
}

for settingName := range types.GetDangerZoneSettings() {
isSettingSynced := true
setting, err := imc.ds.GetSettingWithAutoFillingRO(settingName)
if err != nil {
return false, false, false, err
}
switch settingName {
case types.SettingNameTaintToleration:
isSettingSynced, err = imc.isSettingTaintTolerationSynced(setting, pod)
case types.SettingNameSystemManagedComponentsNodeSelector:
isSettingSynced, err = imc.isSettingNodeSelectorSynced(setting, pod)
case types.SettingNameGuaranteedInstanceManagerCPU, types.SettingNameV2DataEngineGuaranteedInstanceManagerCPU:
isSettingSynced, err = imc.isSettingGuaranteedInstanceManagerCPUSynced(setting, pod)
case types.SettingNamePriorityClass:
isSettingSynced, err = imc.isSettingPriorityClassSynced(setting, pod)
case types.SettingNameStorageNetwork:
isSettingSynced, err = imc.isSettingStorageNetworkSynced(setting, pod)
case types.SettingNameV1DataEngine, types.SettingNameV2DataEngine:
isSettingSynced, err = imc.isSettingDataEngineSynced(settingName, im)
}
if err != nil {
return false, false, false, err
}
if !isSettingSynced {
return false, false, false, nil
}
}

return true, false, false, nil
}

Check notice on line 575 in controller/instance_manager_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/instance_manager_controller.go#L525-L575

Complex Method
func (imc *InstanceManagerController) isSettingTaintTolerationSynced(setting *longhorn.Setting, pod *corev1.Pod) (bool, error) {
newTolerationsList, err := types.UnmarshalTolerations(setting.Value)
if err != nil {
Expand Down Expand Up @@ -622,8 +622,8 @@

func (imc *InstanceManagerController) isSettingStorageNetworkSynced(setting *longhorn.Setting, pod *corev1.Pod) (bool, error) {
nadAnnot := string(types.CNIAnnotationNetworks)

return pod.Annotations[nadAnnot] == setting.Value, nil
nadAnnotValue := types.CreateCniAnnotationFromSetting(setting)
return pod.Annotations[nadAnnot] == nadAnnotValue, nil
}

func (imc *InstanceManagerController) isSettingDataEngineSynced(settingName types.SettingName, im *longhorn.InstanceManager) (bool, error) {
Expand Down Expand Up @@ -682,92 +682,92 @@
return nil
}

func (imc *InstanceManagerController) syncInstanceManagerPDB(im *longhorn.InstanceManager) error {
if err := imc.cleanUpPDBForNonExistingIM(); err != nil {
return err
}

if im.Status.CurrentState != longhorn.InstanceManagerStateRunning {
return nil
}

unschedulable, err := imc.ds.IsKubeNodeUnschedulable(im.Spec.NodeID)
if err != nil {
return err
}

imPDB, err := imc.ds.GetPDBRO(types.GetPDBName(im))
if err != nil && !datastore.ErrorIsNotFound(err) {
return err
}

// When current node is unschedulable, it is a signal that the node is being
// cordoned/drained. The replica IM PDB can be delete when there is least one
// IM PDB on another schedulable node to protect detached volume data.
//
// During Cluster Autoscaler scale down, when a node is marked unschedulable
// means CA already decided that this node is not blocked by any pod PDB limit.
// Hence there is no need to check when Cluster Autoscaler is enabled.
if unschedulable {
if imPDB == nil {
return nil
}

canDeletePDB, err := imc.canDeleteInstanceManagerPDB(im)
if err != nil {
return err
}

if !canDeletePDB {
return nil
}

imc.logger.Infof("Removing %v PDB since Node %v is marked unschedulable", im.Name, imc.controllerID)
return imc.deleteInstanceManagerPDB(im)
}

// If the setting is enabled, Longhorn needs to retain the least IM PDBs as
// possible. Each volume will have at least one replica under the protection
// of an IM PDB while no redundant PDB blocking the Cluster Autoscaler from
// scale down.
// CA considers a node is unremovable when there are strict PDB limits
// protecting the pods on the node.
//
// If the setting is disabled, Longhorn will blindly create IM PDBs for all
// engine and replica IMs.
clusterAutoscalerEnabled, err := imc.ds.GetSettingAsBool(types.SettingNameKubernetesClusterAutoscalerEnabled)
if err != nil {
return err
}

if clusterAutoscalerEnabled {
canDeletePDB, err := imc.canDeleteInstanceManagerPDB(im)
if err != nil {
return err
}

if !canDeletePDB {
if imPDB == nil {
return imc.createInstanceManagerPDB(im)
}
return nil
}

if imPDB != nil {
return imc.deleteInstanceManagerPDB(im)
}

return nil
}

// Make sure that there is a PodDisruptionBudget to protect this instance manager in normal case.
if imPDB == nil {
return imc.createInstanceManagerPDB(im)
}

return nil
}

Check notice on line 770 in controller/instance_manager_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/instance_manager_controller.go#L685-L770

Complex Method
func (imc *InstanceManagerController) cleanUpPDBForNonExistingIM() error {
ims, err := imc.ds.ListInstanceManagersRO()
if err != nil {
Expand Down Expand Up @@ -819,98 +819,98 @@
return nil
}

func (imc *InstanceManagerController) canDeleteInstanceManagerPDB(im *longhorn.InstanceManager) (bool, error) {
// If there is no engine instance process inside the engine instance manager,
// it means that all volumes are detached.
// We can delete the PodDisruptionBudget for the engine instance manager.
if im.Spec.Type == longhorn.InstanceManagerTypeEngine {
if len(im.Status.InstanceEngines)+len(im.Status.Instances) == 0 {
return true, nil
}
return false, nil
}

// Make sure that the instance manager is of type replica
if im.Spec.Type != longhorn.InstanceManagerTypeReplica && im.Spec.Type != longhorn.InstanceManagerTypeAllInOne {
return false, fmt.Errorf("the instance manager %v has invalid type: %v ", im.Name, im.Spec.Type)
}

// Must wait for all volumes detached from the current node first.
// This also means that we must wait until the PDB of engine instance manager
// on the current node is deleted
allVolumeDetached, err := imc.areAllVolumesDetachedFromNode(im.Spec.NodeID)
if err != nil {
return false, err
}
if !allVolumeDetached {
return false, nil
}

nodeDrainingPolicy, err := imc.ds.GetSettingValueExisted(types.SettingNameNodeDrainPolicy)
if err != nil {
return false, err
}
if nodeDrainingPolicy == string(types.NodeDrainPolicyAlwaysAllow) {
return true, nil
}

replicasOnCurrentNode, err := imc.ds.ListReplicasByNodeRO(im.Spec.NodeID)
if err != nil {
if datastore.ErrorIsNotFound(err) {
return true, nil
}
return false, err
}

if nodeDrainingPolicy == string(types.NodeDrainPolicyBlockForEviction) && len(replicasOnCurrentNode) > 0 {
// We must wait for ALL replicas to be evicted before removing the PDB.
return false, nil
}

targetReplicas := []*longhorn.Replica{}
if nodeDrainingPolicy == string(types.NodeDrainPolicyAllowIfReplicaIsStopped) {
for _, replica := range replicasOnCurrentNode {
if replica.Spec.DesireState != longhorn.InstanceStateStopped || replica.Status.CurrentState != longhorn.InstanceStateStopped {
targetReplicas = append(targetReplicas, replica)
}
}
} else {
targetReplicas = replicasOnCurrentNode
}

// For each replica in the target replica list, find out whether there is a PDB protected healthy replica of the
// same volume on another schedulable node.
for _, replica := range targetReplicas {
hasPDBOnAnotherNode := false
isUnusedReplicaOnCurrentNode := false

pdbProtectedHealthyReplicas, err := imc.ds.ListVolumePDBProtectedHealthyReplicasRO(replica.Spec.VolumeName)
if err != nil {
return false, err
}
for _, pdbProtectedHealthyReplica := range pdbProtectedHealthyReplicas {
if pdbProtectedHealthyReplica.Spec.NodeID != im.Spec.NodeID {
hasPDBOnAnotherNode = true
break
}
}

// If a replica has never been started, there is no data stored in this replica, and retaining it makes no sense
// for HA. Hence Longhorn doesn't need to block the PDB removal for the replica. This case typically happens on
// a newly created volume that hasn't been attached to any node.
// https://github.com/longhorn/longhorn/issues/2673
isUnusedReplicaOnCurrentNode = replica.Spec.HealthyAt == "" &&
replica.Spec.FailedAt == "" &&
replica.Spec.NodeID == im.Spec.NodeID

if !hasPDBOnAnotherNode && !isUnusedReplicaOnCurrentNode {
return false, nil
}
}

return true, nil
}

Check notice on line 913 in controller/instance_manager_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/instance_manager_controller.go#L822-L913

Complex Method
func (imc *InstanceManagerController) areAllVolumesDetachedFromNode(nodeName string) (bool, error) {
detached, err := imc.areAllInstanceRemovedFromNodeByType(nodeName, longhorn.InstanceManagerTypeEngine)
if err != nil {
Expand Down
10 changes: 9 additions & 1 deletion controller/setting_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,74 +264,74 @@
return settingDefinition.Category == types.SettingCategoryDangerZone
}

func (sc *SettingController) syncDangerZoneSettingsForManagedComponents(settingName types.SettingName) error {
if !sc.isDangerZoneSetting(settingName) {
return nil
}

dangerSettingsRequiringAllVolumesDetached := []types.SettingName{
types.SettingNameTaintToleration,
types.SettingNameSystemManagedComponentsNodeSelector,
types.SettingNameGuaranteedInstanceManagerCPU,
types.SettingNameV2DataEngineGuaranteedInstanceManagerCPU,
types.SettingNamePriorityClass,
types.SettingNameStorageNetwork,
}

if slices.Contains(dangerSettingsRequiringAllVolumesDetached, settingName) {
detached, _, err := sc.ds.AreAllVolumesDetached(longhorn.DataEngineTypeAll)
if err != nil {
return errors.Wrapf(err, "failed to check volume detachment for %v setting update", settingName)
}

if !detached {
return errors.Errorf("failed to apply %v setting to Longhorn components when there are attached volumes. It will be eventually applied", settingName)
}

switch settingName {
case types.SettingNameTaintToleration:
if err := sc.updateTaintToleration(); err != nil {
return err
}
case types.SettingNameSystemManagedComponentsNodeSelector:
if err := sc.updateNodeSelector(); err != nil {
return err
}
case types.SettingNameGuaranteedInstanceManagerCPU, types.SettingNameV2DataEngineGuaranteedInstanceManagerCPU:
if err := sc.updateInstanceManagerCPURequest(); err != nil {
return err
}
case types.SettingNamePriorityClass:
if err := sc.updatePriorityClass(); err != nil {
return err
}
case types.SettingNameStorageNetwork:
if err := sc.updateCNI(); err != nil {
return err
}
}

return nil
}

// These settings are also protected by webhook validators, when there are new updates.
// Updating them is only allowed when all volumes of the data engine are detached.
dangerSettingsRequiringSpecificDataEngineVolumesDetached := []types.SettingName{
types.SettingNameV1DataEngine,
types.SettingNameV2DataEngine,
}

if slices.Contains(dangerSettingsRequiringSpecificDataEngineVolumesDetached, settingName) {
if err := sc.updateDataEngine(settingName); err != nil {
return errors.Wrapf(err, "failed to apply %v setting to Longhorn instance managers when there are attached volumes. "+
"It will be eventually applied", settingName)
}
}

return nil
}

// getResponsibleNodeID returns which node need to run

Check notice on line 334 in controller/setting_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/setting_controller.go#L267-L334

Complex Method
func getResponsibleNodeID(ds *datastore.DataStore) (string, error) {
readyNodes, err := ds.ListReadyNodesRO()
if err != nil {
Expand All @@ -352,111 +352,111 @@
return responsibleNodes[0], nil
}

func (sc *SettingController) syncBackupTarget() (err error) {
defer func() {
err = errors.Wrap(err, "failed to sync backup target")
}()

stopTimer := func() {
if sc.bsTimer != nil {
sc.bsTimer.Stop()
sc.bsTimer = nil
}
}

responsibleNodeID, err := getResponsibleNodeID(sc.ds)
if err != nil {
return errors.Wrap(err, "failed to select node for sync backup target")
}
if responsibleNodeID != sc.controllerID {
stopTimer()
return nil
}

// Get settings
targetSetting, err := sc.ds.GetSettingWithAutoFillingRO(types.SettingNameBackupTarget)
if err != nil {
return err
}

secretSetting, err := sc.ds.GetSettingWithAutoFillingRO(types.SettingNameBackupTargetCredentialSecret)
if err != nil {
return err
}

interval, err := sc.ds.GetSettingAsInt(types.SettingNameBackupstorePollInterval)
if err != nil {
return err
}
pollInterval := time.Duration(interval) * time.Second

backupTarget, err := sc.ds.GetBackupTarget(types.DefaultBackupTargetName)
if err != nil {
if !datastore.ErrorIsNotFound(err) {
return err
}

// Create the default BackupTarget CR if not present
backupTarget, err = sc.ds.CreateBackupTarget(&longhorn.BackupTarget{
ObjectMeta: metav1.ObjectMeta{
Name: types.DefaultBackupTargetName,
},
Spec: longhorn.BackupTargetSpec{
BackupTargetURL: targetSetting.Value,
CredentialSecret: secretSetting.Value,
PollInterval: metav1.Duration{Duration: pollInterval},
},
})
if err != nil {
return errors.Wrap(err, "failed to create backup target")
}
}

existingBackupTarget := backupTarget.DeepCopy()
defer func() {
backupTarget.Spec.BackupTargetURL = targetSetting.Value
backupTarget.Spec.CredentialSecret = secretSetting.Value
backupTarget.Spec.PollInterval = metav1.Duration{Duration: pollInterval}
if !reflect.DeepEqual(existingBackupTarget.Spec, backupTarget.Spec) {
// Force sync backup target once the BackupTarget spec be updated
backupTarget.Spec.SyncRequestedAt = metav1.Time{Time: time.Now().UTC()}
if _, err = sc.ds.UpdateBackupTarget(backupTarget); err != nil && !apierrors.IsConflict(errors.Cause(err)) {
sc.logger.WithError(err).Warn("Failed to update backup target")
}
if err = sc.handleSecretsForAWSIAMRoleAnnotation(backupTarget.Spec.BackupTargetURL, existingBackupTarget.Spec.CredentialSecret, secretSetting.Value, existingBackupTarget.Spec.BackupTargetURL != targetSetting.Value); err != nil {
sc.logger.WithError(err).Warn("Failed to update secrets for AWSIAMRoleAnnotation")
}
}
}()

noNeedMonitor := targetSetting.Value == "" || pollInterval == time.Duration(0)
if noNeedMonitor {
stopTimer()
return nil
}

if sc.bsTimer != nil {
if sc.bsTimer.pollInterval == pollInterval {
// No need to start a new timer if there was one
return
}
// Stop the timer if the poll interval changes
stopTimer()
}

// Start backup store timer
sc.bsTimer = &BackupStoreTimer{
logger: sc.logger.WithField("component", "backup-store-timer"),
controllerID: sc.controllerID,
ds: sc.ds,

pollInterval: pollInterval,
stopCh: make(chan struct{}),
}
go sc.bsTimer.Start()
return nil
}

Check notice on line 459 in controller/setting_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/setting_controller.go#L355-L459

Complex Method
func (sc *SettingController) handleSecretsForAWSIAMRoleAnnotation(backupTargetURL, oldSecretName, newSecretName string, isBackupTargetURLChanged bool) (err error) {
isSameSecretName := oldSecretName == newSecretName
if isSameSecretName && !isBackupTargetURLChanged {
Expand Down Expand Up @@ -547,88 +547,88 @@
}

// updateTaintToleration deletes all user-deployed and system-managed components immediately with the updated taint toleration.
func (sc *SettingController) updateTaintToleration() error {
setting, err := sc.ds.GetSettingWithAutoFillingRO(types.SettingNameTaintToleration)
if err != nil {
return err
}
newTolerations := setting.Value
newTolerationsList, err := types.UnmarshalTolerations(newTolerations)
if err != nil {
return err
}
newTolerationsMap := util.TolerationListToMap(newTolerationsList)

daemonsetList, err := sc.ds.ListDaemonSetWithLabels(types.GetBaseLabelsForSystemManagedComponent())
if err != nil {
return errors.Wrap(err, "failed to list Longhorn daemonsets for toleration update")
}

deploymentList, err := sc.ds.ListDeploymentWithLabels(types.GetBaseLabelsForSystemManagedComponent())
if err != nil {
return errors.Wrap(err, "failed to list Longhorn deployments for toleration update")
}

imPodList, err := sc.ds.ListInstanceManagerPods()
if err != nil {
return errors.Wrap(err, "failed to list instance manager pods for toleration update")
}

smPodList, err := sc.ds.ListShareManagerPods()
if err != nil {
return errors.Wrap(err, "failed to list share manager pods for toleration update")
}

bimPodList, err := sc.ds.ListBackingImageManagerPods()
if err != nil {
return errors.Wrap(err, "failed to list backing image manager pods for toleration update")
}

for _, dp := range deploymentList {
lastAppliedTolerationsList, err := getLastAppliedTolerationsList(dp)
if err != nil {
return err
}
if reflect.DeepEqual(util.TolerationListToMap(lastAppliedTolerationsList), newTolerationsMap) {
continue
}
if err := sc.updateTolerationForDeployment(dp, lastAppliedTolerationsList, newTolerationsList); err != nil {
return err
}
}

for _, ds := range daemonsetList {
lastAppliedTolerationsList, err := getLastAppliedTolerationsList(ds)
if err != nil {
return err
}
if reflect.DeepEqual(util.TolerationListToMap(lastAppliedTolerationsList), newTolerationsMap) {
continue
}
if err := sc.updateTolerationForDaemonset(ds, lastAppliedTolerationsList, newTolerationsList); err != nil {
return err
}
}

pods := append(imPodList, smPodList...)
pods = append(pods, bimPodList...)
for _, pod := range pods {
lastAppliedTolerations, err := getLastAppliedTolerationsList(pod)
if err != nil {
return err
}
if reflect.DeepEqual(util.TolerationListToMap(lastAppliedTolerations), newTolerationsMap) {
continue
}
sc.logger.Infof("Deleting pod %v to update tolerations from %v to %v", pod.Name, util.TolerationListToMap(lastAppliedTolerations), newTolerationsMap)
if err := sc.ds.DeletePod(pod.Name); err != nil {
return err
}
}

return nil
}

Check notice on line 631 in controller/setting_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/setting_controller.go#L550-L631

Complex Method
func (sc *SettingController) updateTolerationForDeployment(dp *appsv1.Deployment, lastAppliedTolerations, newTolerations []corev1.Toleration) error {
existingTolerationsMap := util.TolerationListToMap(dp.Spec.Template.Spec.Tolerations)
lastAppliedTolerationsMap := util.TolerationListToMap(lastAppliedTolerations)
Expand Down Expand Up @@ -687,74 +687,74 @@
}

// updatePriorityClass deletes all user-deployed and system-managed components immediately with the updated priority class.
func (sc *SettingController) updatePriorityClass() error {
setting, err := sc.ds.GetSettingWithAutoFillingRO(types.SettingNamePriorityClass)
if err != nil {
return err
}
newPriorityClass := setting.Value

daemonsetList, err := sc.ds.ListDaemonSetWithLabels(types.GetBaseLabelsForSystemManagedComponent())
if err != nil {
return errors.Wrap(err, "failed to list Longhorn daemonsets for priority class update")
}

deploymentList, err := sc.ds.ListDeploymentWithLabels(types.GetBaseLabelsForSystemManagedComponent())
if err != nil {
return errors.Wrap(err, "failed to list Longhorn deployments for priority class update")
}

imPodList, err := sc.ds.ListInstanceManagerPods()
if err != nil {
return errors.Wrap(err, "failed to list instance manager pods for priority class update")
}

smPodList, err := sc.ds.ListShareManagerPods()
if err != nil {
return errors.Wrap(err, "failed to list share manager pods for priority class update")
}

bimPodList, err := sc.ds.ListBackingImageManagerPods()
if err != nil {
return errors.Wrap(err, "failed to list backing image manager pods for priority class update")
}

for _, dp := range deploymentList {
if dp.Spec.Template.Spec.PriorityClassName == newPriorityClass {
continue
}
sc.logger.Infof("Updating the priority class from %v to %v for %v", dp.Spec.Template.Spec.PriorityClassName, newPriorityClass, dp.Name)
dp.Spec.Template.Spec.PriorityClassName = newPriorityClass
if _, err := sc.ds.UpdateDeployment(dp); err != nil {
return err
}
}
for _, ds := range daemonsetList {
if ds.Spec.Template.Spec.PriorityClassName == newPriorityClass {
continue
}
sc.logger.Infof("Updating the priority class from %v to %v for %v", ds.Spec.Template.Spec.PriorityClassName, newPriorityClass, ds.Name)
ds.Spec.Template.Spec.PriorityClassName = newPriorityClass
if _, err := sc.ds.UpdateDaemonSet(ds); err != nil {
return err
}
}

pods := append(imPodList, smPodList...)
pods = append(pods, bimPodList...)
for _, pod := range pods {
if pod.Spec.PriorityClassName == newPriorityClass {
continue
}
sc.logger.Infof("Deleting pod %v to update the priority class from %v to %v", pod.Name, pod.Spec.PriorityClassName, newPriorityClass)
if err := sc.ds.DeletePod(pod.Name); err != nil {
return err
}
}

return nil
}

Check notice on line 757 in controller/setting_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/setting_controller.go#L690-L757

Complex Method
func (sc *SettingController) updateKubernetesClusterAutoscalerEnabled() error {
// IM pods annotation will be handled in the instance manager controller

Expand Down Expand Up @@ -820,6 +820,8 @@
}

nadAnnot := string(types.CNIAnnotationNetworks)
nadAnnotValue := types.CreateCniAnnotationFromSetting(storageNetwork)

imPodList, err := sc.ds.ListInstanceManagerPods()
if err != nil {
return errors.Wrapf(err, "failed to list instance manager Pods for %v setting update", types.SettingNameStorageNetwork)
Expand All @@ -832,10 +834,16 @@

pods := append(imPodList, bimPodList...)
for _, pod := range pods {
if pod.Annotations[nadAnnot] == storageNetwork.Value {
if pod.Annotations[nadAnnot] == nadAnnotValue {
continue
}

logrus.WithFields(logrus.Fields{
"pod": pod.Name,
"oldValue": pod.Annotations[nadAnnot],
"newValue": nadAnnotValue,
}).Infof("Deleting pod to update the %v annotation", nadAnnot)

if err := sc.ds.DeletePod(pod.Name); err != nil {
return err
}
Expand Down Expand Up @@ -937,86 +945,86 @@
}

// updateNodeSelector deletes all user-deployed and system-managed components immediately with the updated node selector.
func (sc *SettingController) updateNodeSelector() error {
setting, err := sc.ds.GetSettingWithAutoFillingRO(types.SettingNameSystemManagedComponentsNodeSelector)
if err != nil {
return err
}
newNodeSelector, err := types.UnmarshalNodeSelector(setting.Value)
if err != nil {
return err
}
deploymentList, err := sc.ds.ListDeploymentWithLabels(types.GetBaseLabelsForSystemManagedComponent())
if err != nil {
return errors.Wrap(err, "failed to list Longhorn deployments for node selector update")
}
daemonsetList, err := sc.ds.ListDaemonSetWithLabels(types.GetBaseLabelsForSystemManagedComponent())
if err != nil {
return errors.Wrap(err, "failed to list Longhorn daemonsets for node selector update")
}
imPodList, err := sc.ds.ListInstanceManagerPods()
if err != nil {
return errors.Wrap(err, "failed to list instance manager pods for node selector update")
}
smPodList, err := sc.ds.ListShareManagerPods()
if err != nil {
return errors.Wrap(err, "failed to list share manager pods for node selector update")
}
bimPodList, err := sc.ds.ListBackingImageManagerPods()
if err != nil {
return errors.Wrap(err, "failed to list backing image manager pods for node selector update")
}
for _, dp := range deploymentList {
if dp.Spec.Template.Spec.NodeSelector == nil {
if len(newNodeSelector) == 0 {
continue
}
}
if reflect.DeepEqual(dp.Spec.Template.Spec.NodeSelector, newNodeSelector) {
continue
}
sc.logger.Infof("Updating the node selector from %v to %v for %v", dp.Spec.Template.Spec.NodeSelector, newNodeSelector, dp.Name)
dp.Spec.Template.Spec.NodeSelector = newNodeSelector
if _, err := sc.ds.UpdateDeployment(dp); err != nil {
return err
}
}
for _, ds := range daemonsetList {
if ds.Spec.Template.Spec.NodeSelector == nil {
if len(newNodeSelector) == 0 {
continue
}
}
if reflect.DeepEqual(ds.Spec.Template.Spec.NodeSelector, newNodeSelector) {
continue
}
sc.logger.Infof("Updating the node selector from %v to %v for %v", ds.Spec.Template.Spec.NodeSelector, newNodeSelector, ds.Name)
ds.Spec.Template.Spec.NodeSelector = newNodeSelector
if _, err := sc.ds.UpdateDaemonSet(ds); err != nil {
return err
}
}
pods := append(imPodList, smPodList...)
pods = append(pods, bimPodList...)
for _, pod := range pods {
if pod.Spec.NodeSelector == nil {
if len(newNodeSelector) == 0 {
continue
}
}
if reflect.DeepEqual(pod.Spec.NodeSelector, newNodeSelector) {
continue
}
if pod.DeletionTimestamp == nil {
sc.logger.Infof("Deleting pod %v to update the node selector from %v to %v", pod.Name, pod.Spec.NodeSelector, newNodeSelector)
if err := sc.ds.DeletePod(pod.Name); err != nil {
return err
}
}
}
return nil
}

Check notice on line 1027 in controller/setting_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/setting_controller.go#L948-L1027

Complex Method
func (bst *BackupStoreTimer) Start() {
if bst == nil {
return
Expand Down Expand Up @@ -1620,136 +1628,136 @@
}
}

func (info *ClusterInfo) collectVolumesInfo() error {
volumesRO, err := info.ds.ListVolumesRO()
if err != nil {
return errors.Wrapf(err, "failed to list Longhorn Volumes")
}
volumeCount := len(volumesRO)
volumeCountV1 := 0
for _, volume := range volumesRO {
if volume.Spec.DataEngine == longhorn.DataEngineTypeV1 {
volumeCountV1++
}
}

var totalVolumeSize int
var totalVolumeActualSize int
var totalVolumeNumOfReplicas int
newStruct := func() map[util.StructName]int { return make(map[util.StructName]int, volumeCount) }
accessModeCountStruct := newStruct()
dataEngineCountStruct := newStruct()
dataLocalityCountStruct := newStruct()
frontendCountStruct := newStruct()
offlineReplicaRebuildingCountStruct := newStruct()
replicaAutoBalanceCountStruct := newStruct()
replicaSoftAntiAffinityCountStruct := newStruct()
replicaZoneSoftAntiAffinityCountStruct := newStruct()
replicaDiskSoftAntiAffinityCountStruct := newStruct()
restoreVolumeRecurringJobCountStruct := newStruct()
snapshotDataIntegrityCountStruct := newStruct()
unmapMarkSnapChainRemovedCountStruct := newStruct()
for _, volume := range volumesRO {
dataEngine := types.ValueUnknown
if volume.Spec.DataEngine != "" {
dataEngine = util.ConvertToCamel(string(volume.Spec.DataEngine), "-")
}
dataEngineCountStruct[util.StructName(fmt.Sprintf(ClusterInfoVolumeDataEngineCountFmt, dataEngine))]++

// TODO: Remove this condition when v2 volume actual size is implemented.
// https://github.com/longhorn/longhorn/issues/5947
isVolumeUsingV2DataEngine := datastore.IsDataEngineV2(volume.Spec.DataEngine)
if !isVolumeUsingV2DataEngine {
totalVolumeSize += int(volume.Spec.Size)
totalVolumeActualSize += int(volume.Status.ActualSize)
}
totalVolumeNumOfReplicas += volume.Spec.NumberOfReplicas

accessMode := types.ValueUnknown
if volume.Spec.AccessMode != "" {
accessMode = util.ConvertToCamel(string(volume.Spec.AccessMode), "-")
}
accessModeCountStruct[util.StructName(fmt.Sprintf(ClusterInfoVolumeAccessModeCountFmt, accessMode))]++

dataLocality := types.ValueUnknown
if volume.Spec.DataLocality != "" {
dataLocality = util.ConvertToCamel(string(volume.Spec.DataLocality), "-")
}
dataLocalityCountStruct[util.StructName(fmt.Sprintf(ClusterInfoVolumeDataLocalityCountFmt, dataLocality))]++

if volume.Spec.Frontend != "" && !isVolumeUsingV2DataEngine {
frontend := util.ConvertToCamel(string(volume.Spec.Frontend), "-")
frontendCountStruct[util.StructName(fmt.Sprintf(ClusterInfoVolumeFrontendCountFmt, frontend))]++
}

offlineReplicaRebuilding := info.collectSettingInVolume(string(volume.Spec.OfflineReplicaRebuilding), string(longhorn.OfflineReplicaRebuildingIgnored), types.SettingNameOfflineReplicaRebuilding)
offlineReplicaRebuildingCountStruct[util.StructName(fmt.Sprintf(ClusterInfoVolumeOfflineReplicaRebuildingCountFmt, util.ConvertToCamel(string(offlineReplicaRebuilding), "-")))]++

replicaAutoBalance := info.collectSettingInVolume(string(volume.Spec.ReplicaAutoBalance), string(longhorn.ReplicaAutoBalanceIgnored), types.SettingNameReplicaAutoBalance)
replicaAutoBalanceCountStruct[util.StructName(fmt.Sprintf(ClusterInfoVolumeReplicaAutoBalanceCountFmt, util.ConvertToCamel(string(replicaAutoBalance), "-")))]++

replicaSoftAntiAffinity := info.collectSettingInVolume(string(volume.Spec.ReplicaSoftAntiAffinity), string(longhorn.ReplicaSoftAntiAffinityDefault), types.SettingNameReplicaSoftAntiAffinity)
replicaSoftAntiAffinityCountStruct[util.StructName(fmt.Sprintf(ClusterInfoVolumeReplicaSoftAntiAffinityCountFmt, util.ConvertToCamel(string(replicaSoftAntiAffinity), "-")))]++

replicaZoneSoftAntiAffinity := info.collectSettingInVolume(string(volume.Spec.ReplicaZoneSoftAntiAffinity), string(longhorn.ReplicaZoneSoftAntiAffinityDefault), types.SettingNameReplicaZoneSoftAntiAffinity)
replicaZoneSoftAntiAffinityCountStruct[util.StructName(fmt.Sprintf(ClusterInfoVolumeReplicaZoneSoftAntiAffinityCountFmt, util.ConvertToCamel(string(replicaZoneSoftAntiAffinity), "-")))]++

replicaDiskSoftAntiAffinity := info.collectSettingInVolume(string(volume.Spec.ReplicaDiskSoftAntiAffinity), string(longhorn.ReplicaDiskSoftAntiAffinityDefault), types.SettingNameReplicaDiskSoftAntiAffinity)
replicaDiskSoftAntiAffinityCountStruct[util.StructName(fmt.Sprintf(ClusterInfoVolumeReplicaDiskSoftAntiAffinityCountFmt, util.ConvertToCamel(string(replicaDiskSoftAntiAffinity), "-")))]++

restoreVolumeRecurringJob := info.collectSettingInVolume(string(volume.Spec.RestoreVolumeRecurringJob), string(longhorn.RestoreVolumeRecurringJobDefault), types.SettingNameRestoreVolumeRecurringJobs)
restoreVolumeRecurringJobCountStruct[util.StructName(fmt.Sprintf(ClusterInfoVolumeRestoreVolumeRecurringJobCountFmt, util.ConvertToCamel(string(restoreVolumeRecurringJob), "-")))]++

snapshotDataIntegrity := info.collectSettingInVolume(string(volume.Spec.SnapshotDataIntegrity), string(longhorn.SnapshotDataIntegrityIgnored), types.SettingNameSnapshotDataIntegrity)
snapshotDataIntegrityCountStruct[util.StructName(fmt.Sprintf(ClusterInfoVolumeSnapshotDataIntegrityCountFmt, util.ConvertToCamel(string(snapshotDataIntegrity), "-")))]++

unmapMarkSnapChainRemoved := info.collectSettingInVolume(string(volume.Spec.UnmapMarkSnapChainRemoved), string(longhorn.UnmapMarkSnapChainRemovedIgnored), types.SettingNameRemoveSnapshotsDuringFilesystemTrim)
unmapMarkSnapChainRemovedCountStruct[util.StructName(fmt.Sprintf(ClusterInfoVolumeUnmapMarkSnapChainRemovedCountFmt, util.ConvertToCamel(string(unmapMarkSnapChainRemoved), "-")))]++
}
info.structFields.fields.AppendCounted(accessModeCountStruct)
info.structFields.fields.AppendCounted(dataEngineCountStruct)
info.structFields.fields.AppendCounted(dataLocalityCountStruct)
info.structFields.fields.AppendCounted(frontendCountStruct)
info.structFields.fields.AppendCounted(offlineReplicaRebuildingCountStruct)
info.structFields.fields.AppendCounted(replicaAutoBalanceCountStruct)
info.structFields.fields.AppendCounted(replicaSoftAntiAffinityCountStruct)
info.structFields.fields.AppendCounted(replicaZoneSoftAntiAffinityCountStruct)
info.structFields.fields.AppendCounted(replicaDiskSoftAntiAffinityCountStruct)
info.structFields.fields.AppendCounted(restoreVolumeRecurringJobCountStruct)
info.structFields.fields.AppendCounted(snapshotDataIntegrityCountStruct)
info.structFields.fields.AppendCounted(unmapMarkSnapChainRemovedCountStruct)

// TODO: Use the total volume count instead when v2 volume actual size is implemented.
// https://github.com/longhorn/longhorn/issues/5947
var avgVolumeSize int
var avgVolumeActualSize int
if volumeCountV1 > 0 && totalVolumeSize > 0 {
avgVolumeSize = totalVolumeSize / volumeCountV1

if totalVolumeActualSize > 0 {
avgVolumeActualSize = totalVolumeActualSize / volumeCountV1
}
}

var avgVolumeSnapshotCount int
var avgVolumeNumOfReplicas int
if volumeCount > 0 {
avgVolumeNumOfReplicas = totalVolumeNumOfReplicas / volumeCount

snapshotsRO, err := info.ds.ListSnapshotsRO(labels.Everything())
if err != nil {
return errors.Wrapf(err, "failed to list Longhorn Snapshots")
}
avgVolumeSnapshotCount = len(snapshotsRO) / volumeCount
}
info.structFields.fields.Append(ClusterInfoVolumeAvgSize, avgVolumeSize)
info.structFields.fields.Append(ClusterInfoVolumeAvgActualSize, avgVolumeActualSize)
info.structFields.fields.Append(ClusterInfoVolumeAvgSnapshotCount, avgVolumeSnapshotCount)
info.structFields.fields.Append(ClusterInfoVolumeAvgNumOfReplicas, avgVolumeNumOfReplicas)

return nil
}

Check notice on line 1760 in controller/setting_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/setting_controller.go#L1631-L1760

Complex Method
func (info *ClusterInfo) collectSettingInVolume(volumeSpecValue, ignoredValue string, settingName types.SettingName) string {
if volumeSpecValue == ignoredValue {
globalSetting, err := info.ds.GetSettingWithAutoFillingRO(settingName)
Expand Down