Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(manager): fix logic for when RWX workload is restarted after node… #3077

Merged
merged 1 commit into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions controller/kubernetes_pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,214 +199,207 @@
// relies on cluster network instead, the CSI plugin pod utilizes the host PID namespace
// . Consequently, the mount entry persists on the host namespace even after the
// CSI plugin pod is down.
func (kc *KubernetesPodController) handleWorkloadPodDeletionIfCSIPluginPodIsDown(csiPod *corev1.Pod) error {
logAbort := "Aborting deletion of RWX volume workload pods for NFS remount"
logSkip := "Skipping deletion of RWX volume workload pods for NFS remount"

log := getLoggerForPod(kc.logger, csiPod)

if csiPod.DeletionTimestamp.IsZero() {
return nil
}

storageNetworkSetting, err := kc.ds.GetSettingWithAutoFillingRO(types.SettingNameStorageNetwork)
isStorageNetworkForRWXVolume, err := kc.ds.IsStorageNetworkForRWXVolume()
if err != nil {
log.WithError(err).Warnf("%s. Failed to get setting %v", logAbort, types.SettingNameStorageNetwork)
log.WithError(err).Warnf("%s. Failed to check isStorageNetwork", logAbort)
return nil
}

storageNetworkForRWXVolumeEnabled, err := kc.ds.GetSettingAsBool(types.SettingNameStorageNetworkForRWXVolumeEnabled)
if err != nil {
log.WithError(err).Warnf("%s. Failed to get setting %v", logAbort, types.SettingNameStorageNetworkForRWXVolumeEnabled)
return nil
}

if !types.IsStorageNetworkForRWXVolume(storageNetworkSetting, storageNetworkForRWXVolumeEnabled) {
if !isStorageNetworkForRWXVolume {
return nil
}

log.Info("CSI plugin pod on node is down, handling workload pods")

autoDeletePodWhenVolumeDetachedUnexpectedly, err := kc.ds.GetSettingAsBool(types.SettingNameAutoDeletePodWhenVolumeDetachedUnexpectedly)
if err != nil {
return err
}

if !autoDeletePodWhenVolumeDetachedUnexpectedly {
log.Warnf("%s. The setting %v is not enabled. Without restart the workload pod may lead to an unresponsive mount point", logAbort, types.SettingNameAutoDeletePodWhenVolumeDetachedUnexpectedly)
return nil
}

// Find relevant PersistentVolumes.
var persistentVolumes []*corev1.PersistentVolume
persistentVolume, err := kc.ds.ListPersistentVolumesRO()
if err != nil {
return err
}
for _, pv := range persistentVolume {
if pv.Spec.CSI != nil && pv.Spec.CSI.Driver == types.LonghornDriverName {
persistentVolumes = append(persistentVolumes, pv)
}
}

// Find RWX volumes.
var filteredVolumes []*longhorn.Volume
for _, persistentVolume := range persistentVolumes {
_log := log.WithField("volume", persistentVolume.Name)

volume, err := kc.ds.GetVolumeRO(persistentVolume.Name)
if err != nil {
if apierrors.IsNotFound(err) {
_log.WithError(err).Warnf("%s. Volume is not found", logSkip)
continue
}
return err
}

// Exclude non-RWX volumes.
if volume.Spec.AccessMode != longhorn.AccessModeReadWriteMany {
_log.Debugf("%s. Volume access mode is %v", logSkip, volume.Spec.AccessMode)
continue
}

filteredVolumes = append(filteredVolumes, volume)
}

// Find workload Pods to delete.
var filteredPods []*corev1.Pod
for _, volume := range filteredVolumes {
for _, workloadstatus := range volume.Status.KubernetesStatus.WorkloadsStatus {
_log := log.WithFields(logrus.Fields{
"volume": volume.Name,
"workloadPod": workloadstatus.PodName,
})

pod, err := kc.kubeClient.CoreV1().Pods(volume.Status.KubernetesStatus.Namespace).Get(context.TODO(), workloadstatus.PodName, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
_log.WithError(err).Debugf("%s. Workload pod is not found", logSkip)
continue
}
return err
}

if !pod.DeletionTimestamp.IsZero() {
_log.Debugf("%s. Workload pod is being deleted", logSkip)
continue
}

if pod.CreationTimestamp.After(csiPod.DeletionTimestamp.Time) {
_log.WithFields(logrus.Fields{
"creationTimestamp": pod.CreationTimestamp,
"deletionTimestamp": csiPod.DeletionTimestamp},
).Infof("%s. Workload pod is created after CSI plugin pod deletion timestamp", logSkip)
}

// Only delete pod which has controller to make sure that the pod will be recreated by its controller
if metav1.GetControllerOf(pod) == nil {
_log.Warnf("%s. Workload pod is not managed by a controller", logSkip)
continue
}

if pod.Spec.NodeName == csiPod.Spec.NodeName {
kc.eventRecorder.Eventf(volume, corev1.EventTypeWarning, constant.EventReasonRemount, "Requesting workload pod %v deletion to remount NFS share after unexpected CSI plugin pod %v restart on node %v", pod.Name, csiPod.Name, csiPod.Spec.NodeName)
filteredPods = append(filteredPods, pod)
}
}
}

for _, pod := range filteredPods {
log.WithField("workloadPod", pod.Name).Info("Deleting workload pod on CSI plugin node")
err = kc.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
if err != nil {
if datastore.ErrorIsNotFound(err) {
return nil
}
return err
}
}

return nil
}

// handlePodDeletionIfNodeDown determines whether we are allowed to forcefully delete a pod
// from a failed node based on the users chosen NodeDownPodDeletionPolicy.
// This is necessary because Kubernetes never forcefully deletes pods on a down node,
// the pods are stuck in terminating state forever and Longhorn volumes are not released.
// We provide an option for users to help them automatically force delete terminating pods
// of StatefulSet/Deployment on the downed node. By force deleting, k8s will detach Longhorn volumes
// and spin up replacement pods on a new node.
//
// Force delete a pod when all of the below conditions are meet:
// 1. NodeDownPodDeletionPolicy is different than DoNothing
// 2. pod belongs to a StatefulSet/Deployment depend on NodeDownPodDeletionPolicy
// 3. node containing the pod is down
// 4. the pod is terminating and the DeletionTimestamp has passed.
// 5. pod has a PV with provisioner driver.longhorn.io

Check notice on line 338 in controller/kubernetes_pod_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/kubernetes_pod_controller.go#L202-L338

Complex Method
func (kc *KubernetesPodController) handlePodDeletionIfNodeDown(pod *corev1.Pod, nodeID string, namespace string) error {
deletionPolicy := types.NodeDownPodDeletionPolicyDoNothing
if deletionSetting, err := kc.ds.GetSettingValueExisted(types.SettingNameNodeDownPodDeletionPolicy); err == nil {
deletionPolicy = types.NodeDownPodDeletionPolicy(deletionSetting)
}

shouldDelete := (deletionPolicy == types.NodeDownPodDeletionPolicyDeleteStatefulSetPod && isOwnedByStatefulSet(pod)) ||
(deletionPolicy == types.NodeDownPodDeletionPolicyDeleteDeploymentPod && isOwnedByDeployment(pod)) ||
(deletionPolicy == types.NodeDownPodDeletionPolicyDeleteBothStatefulsetAndDeploymentPod && (isOwnedByStatefulSet(pod) || isOwnedByDeployment(pod)))

if !shouldDelete {
return nil
}

isNodeDown, err := kc.ds.IsNodeDownOrDeleted(nodeID)
if err != nil {
return errors.Wrapf(err, "failed to evaluate Node %v for pod %v in handlePodDeletionIfNodeDown", nodeID, pod.Name)
}
if !isNodeDown {
return nil
}

if pod.DeletionTimestamp == nil {
return nil
}

// make sure the volumeattachments of the pods are gone first
// ref: https://github.com/longhorn/longhorn/issues/2947
volumeAttachments, err := kc.getVolumeAttachmentsOfPod(pod)
if err != nil {
return err
}
for _, va := range volumeAttachments {
if va.DeletionTimestamp == nil {
err := kc.kubeClient.StorageV1().VolumeAttachments().Delete(context.TODO(), va.Name, metav1.DeleteOptions{})
if err != nil {
if datastore.ErrorIsNotFound(err) {
continue
}
return err
}
kc.logger.Infof("%v: deleted volume attachment %v for pod %v on downed node %v", controllerAgentName, va.Name, pod.Name, nodeID)
}
// wait the volumeattachment object to be deleted
kc.logger.Infof("%v: wait for volume attachment %v for pod %v on downed node %v to be deleted", controllerAgentName, va.Name, pod.Name, nodeID)
return nil
}

if pod.DeletionTimestamp.After(time.Now()) {
return nil
}

gracePeriod := int64(0)
err = kc.kubeClient.CoreV1().Pods(namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{
GracePeriodSeconds: &gracePeriod,
})
if err != nil {
return errors.Wrapf(err, "failed to forcefully delete Pod %v on the downed Node %v in handlePodDeletionIfNodeDown", pod.Name, nodeID)
}
kc.logger.Infof("%v: Forcefully deleted pod %v on downed node %v", controllerAgentName, pod.Name, nodeID)

return nil
}

Check notice on line 402 in controller/kubernetes_pod_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/kubernetes_pod_controller.go#L339-L402

Complex Method
func (kc *KubernetesPodController) getVolumeAttachmentsOfPod(pod *corev1.Pod) ([]*storagev1.VolumeAttachment, error) {
var res []*storagev1.VolumeAttachment
volumeAttachments, err := kc.ds.ListVolumeAttachmentsRO()
Expand Down Expand Up @@ -452,76 +445,88 @@

// handlePodDeletionIfVolumeRequestRemount will delete the pod which is using a volume that has requested remount.
// By deleting the consuming pod, Kubernetes will recreated them, reattaches, and remounts the volume.
func (kc *KubernetesPodController) handlePodDeletionIfVolumeRequestRemount(pod *corev1.Pod) error {
// Only handle pod that is on the same node as this manager
if pod.Spec.NodeName != kc.controllerID {
return nil
}

autoDeletePodWhenVolumeDetachedUnexpectedly, err := kc.ds.GetSettingAsBool(types.SettingNameAutoDeletePodWhenVolumeDetachedUnexpectedly)
if err != nil {
return err
}
if !autoDeletePodWhenVolumeDetachedUnexpectedly {
return nil
}

// Only delete pod which has controller to make sure that the pod will be recreated by its controller
if metav1.GetControllerOf(pod) == nil {
return nil
}

volumeList, err := kc.getAssociatedVolumes(pod)
if err != nil {
return err
}

isStorageNetworkForRWXVolume, err := kc.ds.IsStorageNetworkForRWXVolume()
if err != nil {
kc.logger.WithError(err).Warn("Failed to check isStorageNetwork, assuming not")
}

// Only delete pod which has startTime < vol.Status.RemountRequestAt AND timeNow > vol.Status.RemountRequestAt + delayDuration
// The delayDuration is to make sure that we don't repeatedly delete the pod too fast
// when vol.Status.RemountRequestAt is updated too quickly by volumeController
if pod.Status.StartTime == nil {
return nil
}

// Avoid repeat deletion
if pod.DeletionTimestamp != nil {
return nil
}

podStartTime := pod.Status.StartTime.Time
for _, vol := range volumeList {
if vol.Status.RemountRequestedAt == "" {
continue
}

// NFS clients can generally recover without a restart/remount when the NFS server restarts using the same Cluster IP.
// A remount is required when the storage network for RWX is in use because the new NFS server has a different IP.
if isRegularRWXVolume(vol) && !isStorageNetworkForRWXVolume {
james-munson marked this conversation as resolved.
Show resolved Hide resolved
continue
}

remountRequestedAt, err := time.Parse(time.RFC3339, vol.Status.RemountRequestedAt)
if err != nil {
return err
}

timeNow := time.Now()
if podStartTime.Before(remountRequestedAt) {
if !timeNow.After(remountRequestedAt.Add(remountRequestDelayDuration)) {
kc.logger.Infof("Current time is not %v seconds after request remount, requeue the pod %v to handle it later", remountRequestDelayDuration.Seconds(), pod.GetName())
kc.enqueuePodAfter(pod, remountRequestDelayDuration)
return nil
}

gracePeriod := int64(30)
err := kc.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.GetName(), metav1.DeleteOptions{
GracePeriodSeconds: &gracePeriod,
})
if err != nil && !datastore.ErrorIsNotFound(err) {
return err
}
kc.logger.Infof("Deleted pod %v so that Kubernetes will handle remounting volume %v", pod.GetName(), vol.GetName())
return nil
}

}

return nil
}

Check notice on line 529 in controller/kubernetes_pod_controller.go

View check run for this annotation

codefactor.io / CodeFactor

controller/kubernetes_pod_controller.go#L448-L529

Complex Method
func isOwnedByStatefulSet(pod *corev1.Pod) bool {
if ownerRef := metav1.GetControllerOf(pod); ownerRef != nil {
return ownerRef.Kind == types.KubernetesStatefulSet
Expand Down
20 changes: 3 additions & 17 deletions controller/share_manager_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ func (c *ShareManagerController) syncShareManagerEndpoint(sm *longhorn.ShareMana
return nil
}

storageNetworkForRWXVolume, err := c.isStorageNetworkForRWXVolume()
storageNetworkForRWXVolume, err := c.ds.IsStorageNetworkForRWXVolume()
if err != nil {
return err
}
Expand Down Expand Up @@ -1045,20 +1045,6 @@ func (c *ShareManagerController) getShareManagerTolerationsFromStorageClass(sc *
return tolerations
}

func (c *ShareManagerController) isStorageNetworkForRWXVolume() (bool, error) {
storageNetwork, err := c.ds.GetSettingWithAutoFillingRO(types.SettingNameStorageNetwork)
if err != nil {
return false, errors.Wrapf(err, "failed to get setting value %v", types.SettingNameStorageNetwork)
}

storageNetworkForRWXVolumeEnabled, err := c.ds.GetSettingAsBool(types.SettingNameStorageNetworkForRWXVolumeEnabled)
if err != nil {
return false, errors.Wrapf(err, "failed to get setting value %v", types.SettingNameStorageNetworkForRWXVolumeEnabled)
}

return types.IsStorageNetworkForRWXVolume(storageNetwork, storageNetworkForRWXVolumeEnabled), nil
}

func (c *ShareManagerController) checkStorageNetworkApplied() (bool, error) {
targetSettings := []types.SettingName{types.SettingNameStorageNetwork, types.SettingNameStorageNetworkForRWXVolumeEnabled}
for _, item := range targetSettings {
Expand Down Expand Up @@ -1091,7 +1077,7 @@ func (c *ShareManagerController) canCleanupService(shareManagerName string) (boo
return false, nil
}

storageNetworkForRWXVolume, err := c.isStorageNetworkForRWXVolume()
storageNetworkForRWXVolume, err := c.ds.IsStorageNetworkForRWXVolume()
if err != nil {
return false, err
}
Expand Down Expand Up @@ -1362,7 +1348,7 @@ func (c *ShareManagerController) createServiceManifest(sm *longhorn.ShareManager

log := getLoggerForShareManager(c.logger, sm)

storageNetworkForRWXVolume, err := c.isStorageNetworkForRWXVolume()
storageNetworkForRWXVolume, err := c.ds.IsStorageNetworkForRWXVolume()
if err != nil {
log.WithError(err).Warnf("Failed to check storage network for RWX volume")
}
Expand Down
6 changes: 3 additions & 3 deletions controller/volume_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4509,9 +4509,9 @@ func (c *VolumeController) ReconcileShareManagerState(volume *longhorn.Volume) e
log.Infof("Updated image for share manager from %v to %v", sm.Spec.Image, c.smImage)
}

// kill the workload pods, when the share manager goes into error state
// easiest approach is to set the RemountRequestedAt variable,
// since that is already responsible for killing the workload pods
// Give the workload pods a chance to restart when the share manager goes into error state.
// Easiest approach is to set the RemountRequestedAt variable. Pods will make that decision
// in the kubernetes_pod_controller.
if sm.Status.State == longhorn.ShareManagerStateError || sm.Status.State == longhorn.ShareManagerStateUnknown {
volume.Status.RemountRequestedAt = c.nowHandler()
msg := fmt.Sprintf("Volume %v requested remount at %v", volume.Name, volume.Status.RemountRequestedAt)
Expand Down
14 changes: 14 additions & 0 deletions datastore/longhorn.go
Original file line number Diff line number Diff line change
Expand Up @@ -5576,3 +5576,17 @@ func (s *DataStore) IsPVMountOptionReadOnly(volume *longhorn.Volume) (bool, erro
}
return false, nil
}

func (s *DataStore) IsStorageNetworkForRWXVolume() (bool, error) {
storageNetworkSetting, err := s.GetSettingWithAutoFillingRO(types.SettingNameStorageNetwork)
if err != nil {
return false, errors.Wrapf(err, "Failed to get setting %v", types.SettingNameStorageNetwork)
}

storageNetworkForRWXVolumeEnabled, err := s.GetSettingAsBool(types.SettingNameStorageNetworkForRWXVolumeEnabled)
if err != nil {
return false, errors.Wrapf(err, "Failed to get setting %v", types.SettingNameStorageNetworkForRWXVolumeEnabled)
}

return types.IsStorageNetworkForRWXVolume(storageNetworkSetting, storageNetworkForRWXVolumeEnabled), nil
}