Skip to content

Commit

Permalink
Improve the resync logic for node network status
Browse files Browse the repository at this point in the history
Sometimes when a nsx-node-agent pod is created and
running for less than 180 seconds, the operator will
try to update the node status twice (firstly set
network-unavailable=true, then sleep and try to set
network-unavailable=false after 180 seconds)[1].

The code has a redundant check before sleeping, and
in the check logic, Get API reads the node status
from cache which may be not synced after the first
update operation was executed, so an unexpected
"Node condition is not changed" will be reported,
then the taints cannot be removed until the removal
logic was triggerred accidentally by another event
from nsx-node-agent pod. This patch will remove the
redundant check.

And we will assume that the data read by client will
eventually be correct, but may be slightly out of
date. So this patch introduced the logic
assertNodeStatus to ensure the final status is
expected.

This patch also replace the goroutine with RequeueAfter,
the latter is a more native and less error-prone
implementation.

[1] The following logs show this case:

{"level":"info","ts":"2021-03-08T14:56:37.864Z","logger":"status_manager","msg":"nsx-node-agent-p8ss5/nsx-kube-proxy for node compute-2 started for less than 17.864554094s"}
{"level":"info","ts":"2021-03-08T14:56:37.864Z","logger":"status_manager","msg":"nsx-node-agent-p8ss5/nsx-node-agent for node compute-2 started for less than 17.864554094s"}
{"level":"info","ts":"2021-03-08T14:56:37.864Z","logger":"status_manager","msg":"nsx-node-agent-p8ss5/nsx-ovs for node compute-2 started for less than 17.864554094s"}
{"level":"info","ts":"2021-03-08T14:56:37.864Z","logger":"status_manager","msg":"Setting status NetworkUnavailable to true for node compute-2"}
{"level":"info","ts":"2021-03-08T14:56:37.876Z","logger":"status_manager","msg":"Updated node condition NetworkUnavailable to true for node compute-2"}
{"level":"info","ts":"2021-03-08T14:56:37.876Z","logger":"status_manager","msg":"Node condition is not changed"}
...
{"level":"info","ts":"2021-03-08T15:26:13.541Z","logger":"status_manager","msg":"Setting status NetworkUnavailable to false for node compute-2"}
{"level":"info","ts":"2021-03-08T15:26:13.541Z","logger":"status_manager","msg":"Setting status NetworkUnavailable to false for node compute-2 after -26m53.541741583s"}
  • Loading branch information
heypnus committed Mar 26, 2021
1 parent fced14c commit 4698114
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 77 deletions.
16 changes: 6 additions & 10 deletions pkg/controller/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"fmt"
"io"
"strings"
"time"

configv1 "github.com/openshift/api/config/v1"
"github.com/openshift/cluster-network-operator/pkg/apply"
Expand Down Expand Up @@ -42,11 +41,6 @@ var SetControllerReference = controllerutil.SetControllerReference

var ApplyObject = apply.ApplyObject

// The periodic resync interval.
// We will re-run the reconciliation logic, even if the NCP configuration
// hasn't changed.
var ResyncPeriod = 2 * time.Minute

var firstBoot = true

// Add creates a new Pod Controller and adds it to the Manager. The Manager will set fields on the Controller
Expand Down Expand Up @@ -189,15 +183,17 @@ func (r *ReconcilePod) isForNsxNodeAgentPod(request reconcile.Request) bool {
func (r *ReconcilePod) Reconcile(request reconcile.Request) (reconcile.Result, error) {
reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)

if err := r.status.CheckExistingAgentPods(&firstBoot, r.sharedInfo); err != nil {
return reconcile.Result{Requeue: true}, err
result, err := r.status.CheckExistingAgentPods(&firstBoot, r.sharedInfo)
emptyResult := reconcile.Result{}
if result != emptyResult || err != nil {
return result, err
}

if !r.isForNcpDeployOrNodeAgentDS(request) {
// the request is not for ncp deployement or nsx-node-agent ds, but for nsx-node-agent pod
if r.isForNsxNodeAgentPod(request) {
reqLogger.Info("Reconciling pod update for network status")
r.status.SetNodeConditionFromPod(request.NamespacedName, r.sharedInfo, nil)
return r.status.SetNodeConditionFromPod(request.NamespacedName, r.sharedInfo, nil)
}
return reconcile.Result{}, nil
}
Expand All @@ -216,7 +212,7 @@ func (r *ReconcilePod) Reconcile(request reconcile.Request) (reconcile.Result, e
}
}

return reconcile.Result{RequeueAfter: ResyncPeriod}, nil
return reconcile.Result{RequeueAfter: operatortypes.DefaultResyncPeriod}, nil
}

func (r *ReconcilePod) recreateNsxNcpResourceIfDeleted(resName string) error {
Expand Down
16 changes: 8 additions & 8 deletions pkg/controller/pod/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ func (r *ReconcilePod) testReconcileOnWatchedResource(t *testing.T) {
if err != nil {
t.Fatalf("reconcile: (%v)", err)
}
if res.RequeueAfter != ResyncPeriod {
t.Fatalf("reconcile should requeue the request after %v", ResyncPeriod)
if res.RequeueAfter != operatortypes.DefaultResyncPeriod {
t.Fatalf("reconcile should requeue the request after %v", operatortypes.DefaultResyncPeriod)
}
r.client.Delete(context.TODO(), ncpDeployment)
}
Expand Down Expand Up @@ -213,8 +213,8 @@ func (r *ReconcilePod) testReconcileOnWatchedResourceWhenDeleted(t *testing.T) {
if err != nil {
t.Fatalf("reconcile: (%v)", err)
}
if res.RequeueAfter != ResyncPeriod {
t.Fatalf("reconcile should requeue the request after %v", ResyncPeriod)
if res.RequeueAfter != operatortypes.DefaultResyncPeriod {
t.Fatalf("reconcile should requeue the request after %v", operatortypes.DefaultResyncPeriod)
}

// Validate that reconcile recreated the deployment
Expand Down Expand Up @@ -289,9 +289,9 @@ func (r *ReconcilePod) testReconcileOnCLBNsxNodeAgentInvalidResolvConf(
if err != nil {
t.Fatalf("reconcile: (%v)", err)
}
if res.RequeueAfter != ResyncPeriod {
if res.RequeueAfter != operatortypes.DefaultResyncPeriod {
t.Fatalf("reconcile should requeue the request after %v but it did "+
"after %v", ResyncPeriod, res.RequeueAfter)
"after %v", operatortypes.DefaultResyncPeriod, res.RequeueAfter)
}
obj := &corev1.Pod{}
namespacedName := types.NamespacedName{
Expand Down Expand Up @@ -336,9 +336,9 @@ func (r *ReconcilePod) testReconcileOnCLBNsxNodeAgentInvalidResolvConf(
if err != nil {
t.Fatalf("reconcile: (%v)", err)
}
if res.RequeueAfter != ResyncPeriod {
if res.RequeueAfter != operatortypes.DefaultResyncPeriod {
t.Fatalf("reconcile should requeue the request after %v but it did "+
"after %v", ResyncPeriod, res.RequeueAfter)
"after %v", operatortypes.DefaultResyncPeriod, res.RequeueAfter)
}
obj = &corev1.Pod{}
err = c.Get(context.TODO(), namespacedName, obj)
Expand Down
1 change: 0 additions & 1 deletion pkg/controller/sharedinfo/shared_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ var log = logf.Log.WithName("shared_info")
type SharedInfo struct {
AdaptorName string
AddNodeTag bool
LastNetworkAvailable map[string]time.Time
LastNodeAgentStartTime map[string]time.Time
NetworkConfig *configv1.Network
OperatorConfigMap *corev1.ConfigMap
Expand Down
133 changes: 75 additions & 58 deletions pkg/controller/statusmanager/pod_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
client "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

var log = logf.Log.WithName("status_manager")
Expand Down Expand Up @@ -64,7 +65,7 @@ type deploymentState struct {
LastChangeTime time.Time
}

func FindStatusCondition(conditions []corev1.NodeCondition, conditionType corev1.NodeConditionType) *corev1.NodeCondition {
func FindNodeStatusCondition(conditions []corev1.NodeCondition, conditionType corev1.NodeConditionType) *corev1.NodeCondition {
for i := range conditions {
if conditions[i].Type == conditionType {
return &conditions[i]
Expand All @@ -78,7 +79,7 @@ func SetNodeCondition(conditions *[]corev1.NodeCondition, newCondition corev1.No
if conditions == nil {
conditions = &[]corev1.NodeCondition{}
}
existingCondition := FindStatusCondition(*conditions, newCondition.Type)
existingCondition := FindNodeStatusCondition(*conditions, newCondition.Type)
if existingCondition == nil {
newCondition.LastTransitionTime = metav1.NewTime(time.Now())
*conditions = append(*conditions, newCondition)
Expand All @@ -98,77 +99,96 @@ func SetNodeCondition(conditions *[]corev1.NodeCondition, newCondition corev1.No
// CheckExistingAgentPods is for a case: nsx-node-agent becomes unhealthy -> NetworkUnavailable=True -> operator off ->
// nsx-node-agent becomes healthy and keeps running -> operator up -> operator cannot receive nsx-node-agent event to
// set NetworkUnavailable=False. So a full sync at the start time is necessary.
func (status *StatusManager) CheckExistingAgentPods(firstBoot *bool, sharedInfo *sharedinfo.SharedInfo) error {
func (status *StatusManager) CheckExistingAgentPods (firstBoot *bool, sharedInfo *sharedinfo.SharedInfo) (reconcile.Result, error) {
status.Lock()
defer status.Unlock()

if !*firstBoot {
return nil
return reconcile.Result{}, nil
}
log.Info("Checking all nsx-node-agent pods for node condition")
pods := &corev1.PodList{}
err := status.client.List(context.TODO(), pods, client.MatchingLabels{"component": operatortypes.NsxNodeAgentContainerName})
if err != nil {
log.Error(err, "Error getting pods for node condition")
return err
return reconcile.Result{}, err
}
if len(pods.Items) == 0 {
log.Info("nsx-node-agent not found for node condition")
nodes := corev1.NodeList{}
err = status.client.List(context.TODO(), &nodes)
if err != nil {
log.Error(err, "Failed to get nodes for condition updating")
return err
return reconcile.Result{}, err
}
for _, node := range nodes.Items {
status.setNodeNetworkUnavailable(node.ObjectMeta.Name, false, nil, "NSXNodeAgent", "Waiting for nsx-node-agent to be created", sharedInfo)
// In this case the nsx-node-agent is not created yet so we don't need to care about the graceful time
_, err := status.setNodeNetworkUnavailable(node.ObjectMeta.Name, false, nil, "NSXNodeAgent", "Waiting for nsx-node-agent to be created", sharedInfo)
if err != nil {
return reconcile.Result{}, err
}
}
*firstBoot = false
return nil
return reconcile.Result{}, nil
}
podName := types.NamespacedName{}
for _, pod := range pods.Items {
status.setNodeConditionFromPod(podName, sharedInfo, &pod)
if _, err := status.setNodeConditionFromPod(podName, sharedInfo, &pod); err != nil {
return reconcile.Result{}, err
}
}
*firstBoot = false
return nil
return reconcile.Result{}, nil
}

func (status *StatusManager) setNodeNetworkUnavailable(nodeName string, ready bool, startedAt *time.Time, reason string, message string, sharedInfo *sharedinfo.SharedInfo) {
if sharedInfo.LastNetworkAvailable == nil {
sharedInfo.LastNetworkAvailable = make(map[string]time.Time)
// In general, write your controller with the assumption that information will eventually be correct, but may be slightly out of date.
// And assume that they need to be repeated if they don't occur after a given time (e.g. using a requeue result).
// More details refer to https://github.com/kubernetes-sigs/controller-runtime/blob/v0.5.2/FAQ.md#q-my-cache-might-be-stale-if-i-read-from-a-cache-how-should-i-deal-with-that
func (status *StatusManager) assertNodeStatus(nodeName string, desireReady bool) (reconcile.Result, error) {
node := &corev1.Node{}
err := status.client.Get(context.TODO(), types.NamespacedName{Name: nodeName}, node)
if err != nil {
log.Error(err, fmt.Sprintf("Failed to check status updating for node %s", nodeName))
return reconcile.Result{}, err
}
if ready {
log.V(1).Info(fmt.Sprintf("Last network unavailable time of node %s is set to %s", nodeName, *startedAt))
sharedInfo.LastNetworkAvailable[nodeName] = *startedAt
existingCondition := FindNodeStatusCondition(node.Status.Conditions, "NetworkUnavailable")
if existingCondition == nil {
log.Info(fmt.Sprintf("Expecting node %s ready=%v, observed nil", nodeName, desireReady))
} else if desireReady && existingCondition.Status != corev1.ConditionFalse {
log.Info(fmt.Sprintf("Expecting node %s ready=true, observed false", nodeName))
} else if !desireReady && existingCondition.Status == corev1.ConditionFalse {
log.Info(fmt.Sprintf("Expecting node %s ready=false, observed true", nodeName))
} else {
delete(sharedInfo.LastNetworkAvailable, nodeName)
return reconcile.Result{}, nil
}
_, changed := status.combineNode(nodeName, ready, reason, message)
if !changed {
log.Info("Node condition is not changed")
return
}
log.Info(fmt.Sprintf("Setting status NetworkUnavailable to %v for node %s", !ready, nodeName))
return reconcile.Result{RequeueAfter: operatortypes.DefaultResyncPeriod}, nil
}

func (status *StatusManager) setNodeNetworkUnavailable(nodeName string, ready bool, startedAt *time.Time, reason string, message string, sharedInfo *sharedinfo.SharedInfo) (reconcile.Result, error) {
log.V(1).Info(fmt.Sprintf("Setting status NetworkUnavailable to %v for node %s", !ready, nodeName))
if !ready {
status.updateNodeStatus(nodeName, ready, reason, message)
return
err := status.updateNodeStatus(nodeName, ready, reason, message)
if err != nil {
return reconcile.Result{}, err
}
return status.assertNodeStatus(nodeName, ready)
}
// When the network status looks ready, we still need to wait for a graceful time
// When the network status looks ready, we should check whether need to wait for a graceful time
now := time.Now()
startedTime := now.Sub(*startedAt)
go func() {
sleepTime := operatortypes.TimeBeforeRecoverNetwork - startedTime
time.Sleep(sleepTime)
if (sharedInfo.LastNetworkAvailable[nodeName]) != *startedAt {
log.V(1).Info(fmt.Sprintf("Last network unavailable time of node %s changed, expecting %v, got %v, goroutine exiting",
nodeName, *startedAt, sharedInfo.LastNetworkAvailable[nodeName]))
return
if startedTime >= operatortypes.TimeBeforeRecoverNetwork {
err := status.updateNodeStatus(nodeName, ready, reason, message)
if err != nil {
return reconcile.Result{}, err
}
log.Info(fmt.Sprintf("Setting status NetworkUnavailable to %v for node %s after %v", !ready, nodeName, sleepTime))
status.updateNodeStatus(nodeName, ready, reason, message)
return
}()
return status.assertNodeStatus(nodeName, ready)
}
if err := status.updateNodeStatus(nodeName, false, reason, message); err != nil {
return reconcile.Result{}, err
}
sleepTime := operatortypes.TimeBeforeRecoverNetwork - startedTime
log.Info(fmt.Sprintf("Waiting %v to double check network status for node %s", sleepTime, nodeName))
return reconcile.Result{RequeueAfter: sleepTime}, nil
}

func (status *StatusManager) combineNode(nodeName string, ready bool, reason string, message string) (*corev1.Node, bool) {
Expand Down Expand Up @@ -198,37 +218,40 @@ func (status *StatusManager) combineNode(nodeName string, ready bool, reason str
return node, changed
}

func (status *StatusManager) updateNodeStatus(nodeName string, ready bool, reason string, message string) {
func (status *StatusManager) updateNodeStatus(nodeName string, ready bool, reason string, message string) error {
// need to retrieve the node info, otherwise, the update may fail because the node has been modified during monitor slept
node, changed := status.combineNode(nodeName, ready, reason, message)
if !changed {
log.Info("Node condition is not changed, skip updating")
return
log.V(1).Info(fmt.Sprintf("Node condition is not changed (nodeName: %s, reason: %s, message: %s), skip updating", nodeName, reason, message))
return nil
}
err := status.client.Status().Update(context.TODO(), node)
if err != nil {
if err := status.client.Status().Update(context.TODO(), node); err != nil {
log.Error(err, fmt.Sprintf("Failed to update node condition NetworkUnavailable to %v for node %s", !ready, nodeName))
} else {
log.Info(fmt.Sprintf("Updated node condition NetworkUnavailable to %v for node %s", !ready, nodeName))
return err
}
log.Info(fmt.Sprintf("Updated node condition NetworkUnavailable to %v for node %s", !ready, nodeName))
return nil
}

// Get the pod status from API server
func (status *StatusManager) SetNodeConditionFromPod(podName types.NamespacedName, sharedInfo *sharedinfo.SharedInfo, pod *corev1.Pod) {
func (status *StatusManager) SetNodeConditionFromPod(podName types.NamespacedName, sharedInfo *sharedinfo.SharedInfo, pod *corev1.Pod) (reconcile.Result, error) {
status.Lock()
defer status.Unlock()
status.setNodeConditionFromPod(podName, sharedInfo, pod)
return status.setNodeConditionFromPod(podName, sharedInfo, pod)
}

func (status *StatusManager) setNodeConditionFromPod(podName types.NamespacedName, sharedInfo *sharedinfo.SharedInfo, pod *corev1.Pod) {
func (status *StatusManager) setNodeConditionFromPod(podName types.NamespacedName, sharedInfo *sharedinfo.SharedInfo, pod *corev1.Pod) (reconcile.Result, error) {
var reason string
now := time.Now()
if pod == nil {
pod = &corev1.Pod{}
err := status.client.Get(context.TODO(), podName, pod)
if err != nil {
if err := status.client.Get(context.TODO(), podName, pod); err != nil {
log.Error(err, fmt.Sprintf("Error getting %s for node condition", operatortypes.NsxNodeAgentContainerName))
return
isNotFound := errors.IsNotFound(err)
if isNotFound {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
}
containerStatus := corev1.ContainerStatus{}
Expand All @@ -252,7 +275,7 @@ func (status *StatusManager) setNodeConditionFromPod(podName types.NamespacedNam
nodeLastStartedAt := sharedInfo.LastNodeAgentStartTime[nodeName]
if podStartedAt.Before(nodeLastStartedAt) {
log.Info(fmt.Sprintf("Pod %s started at %v on node %s is outdated, there's new pod started at %v", podName, podStartedAt, nodeName, nodeLastStartedAt))
return
return reconcile.Result{}, nil
} else {
sharedInfo.LastNodeAgentStartTime[nodeName] = podStartedAt
}
Expand Down Expand Up @@ -303,13 +326,7 @@ func (status *StatusManager) setNodeConditionFromPod(podName types.NamespacedNam
}
}
}
if len(tmpMsgs) > 0 {
// if the pod just restarted, we still set the ready to false then try to set it to true,
// i.e. invoke the setNodeNetworkUnavailable 2 times because there's no sequent pod status change to
// trigger the pod controller to set ready to true.
status.setNodeNetworkUnavailable(nodeName, false, nil, "NSXNodeAgent", messages + tmpMsgs, sharedInfo)
}
status.setNodeNetworkUnavailable(nodeName, ready, lastStartedAt, "NSXNodeAgent", messages, sharedInfo)
return status.setNodeNetworkUnavailable(nodeName, ready, lastStartedAt, "NSXNodeAgent", messages + tmpMsgs, sharedInfo)
}

// SetFromPodsForOverall sets the operator Degraded/Progressing/Available status, based on
Expand Down
1 change: 1 addition & 0 deletions pkg/types/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ const (
NsxNodeAgentContainerName string = "nsx-node-agent"
OsReleaseFile string = "/host/etc/os-release"
TimeBeforeRecoverNetwork time.Duration = 180 * time.Second
DefaultResyncPeriod time.Duration = 2 * time.Minute
)

0 comments on commit 4698114

Please sign in to comment.