Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the resync logic for node network status #112

Merged
merged 1 commit into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 6 additions & 10 deletions pkg/controller/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"fmt"
"io"
"strings"
"time"

configv1 "github.com/openshift/api/config/v1"
"github.com/openshift/cluster-network-operator/pkg/apply"
Expand Down Expand Up @@ -42,11 +41,6 @@ var SetControllerReference = controllerutil.SetControllerReference

var ApplyObject = apply.ApplyObject

// The periodic resync interval.
// We will re-run the reconciliation logic, even if the NCP configuration
// hasn't changed.
var ResyncPeriod = 2 * time.Minute

var firstBoot = true

// Add creates a new Pod Controller and adds it to the Manager. The Manager will set fields on the Controller
Expand Down Expand Up @@ -189,15 +183,17 @@ func (r *ReconcilePod) isForNsxNodeAgentPod(request reconcile.Request) bool {
func (r *ReconcilePod) Reconcile(request reconcile.Request) (reconcile.Result, error) {
reqLogger := log.WithValues("Request.Namespace", request.Namespace, "Request.Name", request.Name)

if err := r.status.CheckExistingAgentPods(&firstBoot, r.sharedInfo); err != nil {
return reconcile.Result{Requeue: true}, err
result, err := r.status.CheckExistingAgentPods(&firstBoot, r.sharedInfo)
emptyResult := reconcile.Result{}
if result != emptyResult || err != nil {
return result, err
}

if !r.isForNcpDeployOrNodeAgentDS(request) {
// the request is not for ncp deployement or nsx-node-agent ds, but for nsx-node-agent pod
if r.isForNsxNodeAgentPod(request) {
reqLogger.Info("Reconciling pod update for network status")
r.status.SetNodeConditionFromPod(request.NamespacedName, r.sharedInfo, nil)
return r.status.SetNodeConditionFromPod(request.NamespacedName, r.sharedInfo, nil)
}
return reconcile.Result{}, nil
}
Expand All @@ -216,7 +212,7 @@ func (r *ReconcilePod) Reconcile(request reconcile.Request) (reconcile.Result, e
}
}

return reconcile.Result{RequeueAfter: ResyncPeriod}, nil
return reconcile.Result{RequeueAfter: operatortypes.DefaultResyncPeriod}, nil
}

func (r *ReconcilePod) recreateNsxNcpResourceIfDeleted(resName string) error {
Expand Down
16 changes: 8 additions & 8 deletions pkg/controller/pod/pod_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ func (r *ReconcilePod) testReconcileOnWatchedResource(t *testing.T) {
if err != nil {
t.Fatalf("reconcile: (%v)", err)
}
if res.RequeueAfter != ResyncPeriod {
t.Fatalf("reconcile should requeue the request after %v", ResyncPeriod)
if res.RequeueAfter != operatortypes.DefaultResyncPeriod {
t.Fatalf("reconcile should requeue the request after %v", operatortypes.DefaultResyncPeriod)
}
r.client.Delete(context.TODO(), ncpDeployment)
}
Expand Down Expand Up @@ -213,8 +213,8 @@ func (r *ReconcilePod) testReconcileOnWatchedResourceWhenDeleted(t *testing.T) {
if err != nil {
t.Fatalf("reconcile: (%v)", err)
}
if res.RequeueAfter != ResyncPeriod {
t.Fatalf("reconcile should requeue the request after %v", ResyncPeriod)
if res.RequeueAfter != operatortypes.DefaultResyncPeriod {
t.Fatalf("reconcile should requeue the request after %v", operatortypes.DefaultResyncPeriod)
}

// Validate that reconcile recreated the deployment
Expand Down Expand Up @@ -289,9 +289,9 @@ func (r *ReconcilePod) testReconcileOnCLBNsxNodeAgentInvalidResolvConf(
if err != nil {
t.Fatalf("reconcile: (%v)", err)
}
if res.RequeueAfter != ResyncPeriod {
if res.RequeueAfter != operatortypes.DefaultResyncPeriod {
t.Fatalf("reconcile should requeue the request after %v but it did "+
"after %v", ResyncPeriod, res.RequeueAfter)
"after %v", operatortypes.DefaultResyncPeriod, res.RequeueAfter)
}
obj := &corev1.Pod{}
namespacedName := types.NamespacedName{
Expand Down Expand Up @@ -336,9 +336,9 @@ func (r *ReconcilePod) testReconcileOnCLBNsxNodeAgentInvalidResolvConf(
if err != nil {
t.Fatalf("reconcile: (%v)", err)
}
if res.RequeueAfter != ResyncPeriod {
if res.RequeueAfter != operatortypes.DefaultResyncPeriod {
t.Fatalf("reconcile should requeue the request after %v but it did "+
"after %v", ResyncPeriod, res.RequeueAfter)
"after %v", operatortypes.DefaultResyncPeriod, res.RequeueAfter)
}
obj = &corev1.Pod{}
err = c.Get(context.TODO(), namespacedName, obj)
Expand Down
1 change: 0 additions & 1 deletion pkg/controller/sharedinfo/shared_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ var log = logf.Log.WithName("shared_info")
type SharedInfo struct {
AdaptorName string
AddNodeTag bool
LastNetworkAvailable map[string]time.Time
LastNodeAgentStartTime map[string]time.Time
NetworkConfig *configv1.Network
OperatorConfigMap *corev1.ConfigMap
Expand Down
138 changes: 80 additions & 58 deletions pkg/controller/statusmanager/pod_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
client "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

var log = logf.Log.WithName("status_manager")
Expand Down Expand Up @@ -64,7 +65,7 @@ type deploymentState struct {
LastChangeTime time.Time
}

func FindStatusCondition(conditions []corev1.NodeCondition, conditionType corev1.NodeConditionType) *corev1.NodeCondition {
func FindNodeStatusCondition(conditions []corev1.NodeCondition, conditionType corev1.NodeConditionType) *corev1.NodeCondition {
for i := range conditions {
if conditions[i].Type == conditionType {
return &conditions[i]
Expand All @@ -78,7 +79,7 @@ func SetNodeCondition(conditions *[]corev1.NodeCondition, newCondition corev1.No
if conditions == nil {
conditions = &[]corev1.NodeCondition{}
}
existingCondition := FindStatusCondition(*conditions, newCondition.Type)
existingCondition := FindNodeStatusCondition(*conditions, newCondition.Type)
if existingCondition == nil {
newCondition.LastTransitionTime = metav1.NewTime(time.Now())
*conditions = append(*conditions, newCondition)
Expand All @@ -98,77 +99,96 @@ func SetNodeCondition(conditions *[]corev1.NodeCondition, newCondition corev1.No
// CheckExistingAgentPods is for a case: nsx-node-agent becomes unhealthy -> NetworkUnavailable=True -> operator off ->
// nsx-node-agent becomes healthy and keeps running -> operator up -> operator cannot receive nsx-node-agent event to
// set NetworkUnavailable=False. So a full sync at the start time is necessary.
func (status *StatusManager) CheckExistingAgentPods(firstBoot *bool, sharedInfo *sharedinfo.SharedInfo) error {
func (status *StatusManager) CheckExistingAgentPods(firstBoot *bool, sharedInfo *sharedinfo.SharedInfo) (reconcile.Result, error) {
status.Lock()
defer status.Unlock()

if !*firstBoot {
return nil
return reconcile.Result{}, nil
}
log.Info("Checking all nsx-node-agent pods for node condition")
pods := &corev1.PodList{}
err := status.client.List(context.TODO(), pods, client.MatchingLabels{"component": operatortypes.NsxNodeAgentContainerName})
if err != nil {
log.Error(err, "Error getting pods for node condition")
return err
return reconcile.Result{}, err
}
if len(pods.Items) == 0 {
log.Info("nsx-node-agent not found for node condition")
nodes := corev1.NodeList{}
err = status.client.List(context.TODO(), &nodes)
if err != nil {
log.Error(err, "Failed to get nodes for condition updating")
return err
return reconcile.Result{}, err
}
for _, node := range nodes.Items {
status.setNodeNetworkUnavailable(node.ObjectMeta.Name, false, nil, "NSXNodeAgent", "Waiting for nsx-node-agent to be created", sharedInfo)
// In this case the nsx-node-agent is not created yet so we don't need to care about the graceful time
_, err := status.setNodeNetworkUnavailable(node.ObjectMeta.Name, false, nil, "NSXNodeAgent", "Waiting for nsx-node-agent to be created", sharedInfo)
if err != nil {
return reconcile.Result{}, err
}
}
*firstBoot = false
return nil
return reconcile.Result{}, nil
}
podName := types.NamespacedName{}
for _, pod := range pods.Items {
status.setNodeConditionFromPod(podName, sharedInfo, &pod)
if _, err := status.setNodeConditionFromPod(podName, sharedInfo, &pod); err != nil {
return reconcile.Result{}, err
}
}
*firstBoot = false
return nil
return reconcile.Result{}, nil
}

func (status *StatusManager) setNodeNetworkUnavailable(nodeName string, ready bool, startedAt *time.Time, reason string, message string, sharedInfo *sharedinfo.SharedInfo) {
if sharedInfo.LastNetworkAvailable == nil {
sharedInfo.LastNetworkAvailable = make(map[string]time.Time)
// assertNodeStatus helps you write controller with the assumption that information will eventually be correct, but may be slightly out of date.
// And assume that they need to be repeated if they don't occur after a given time (e.g. using a requeue result).
// More details refer to https://github.com/kubernetes-sigs/controller-runtime/blob/v0.5.2/FAQ.md#q-my-cache-might-be-stale-if-i-read-from-a-cache-how-should-i-deal-with-that
func (status *StatusManager) assertNodeStatus(nodeName string, desireReady bool) (reconcile.Result, error) {
node := &corev1.Node{}
err := status.client.Get(context.TODO(), types.NamespacedName{Name: nodeName}, node)
if err != nil {
log.Error(err, fmt.Sprintf("Failed to check status updating for node %s", nodeName))
return reconcile.Result{}, err
}
if ready {
log.V(1).Info(fmt.Sprintf("Last network unavailable time of node %s is set to %s", nodeName, *startedAt))
sharedInfo.LastNetworkAvailable[nodeName] = *startedAt
existingCondition := FindNodeStatusCondition(node.Status.Conditions, "NetworkUnavailable")
if existingCondition == nil {
log.Info(fmt.Sprintf("Expecting node %s ready=%v, observed nil", nodeName, desireReady))
} else if desireReady && existingCondition.Status != corev1.ConditionFalse {
log.Info(fmt.Sprintf("Expecting node %s ready=true, observed false", nodeName))
} else if !desireReady && existingCondition.Status == corev1.ConditionFalse {
log.Info(fmt.Sprintf("Expecting node %s ready=false, observed true", nodeName))
} else {
delete(sharedInfo.LastNetworkAvailable, nodeName)
return reconcile.Result{}, nil
}
_, changed := status.combineNode(nodeName, ready, reason, message)
if !changed {
log.Info("Node condition is not changed")
return
}
log.Info(fmt.Sprintf("Setting status NetworkUnavailable to %v for node %s", !ready, nodeName))
return reconcile.Result{RequeueAfter: operatortypes.DefaultResyncPeriod}, nil
}

func (status *StatusManager) setNodeNetworkUnavailable(nodeName string, ready bool, startedAt *time.Time, reason string, message string, sharedInfo *sharedinfo.SharedInfo) (reconcile.Result, error) {
log.V(1).Info(fmt.Sprintf("Setting status NetworkUnavailable to %v for node %s", !ready, nodeName))
heypnus marked this conversation as resolved.
Show resolved Hide resolved
if !ready {
status.updateNodeStatus(nodeName, ready, reason, message)
return
err := status.updateNodeStatus(nodeName, ready, reason, message)
if err != nil {
return reconcile.Result{}, err
}
return status.assertNodeStatus(nodeName, ready)
}
// When the network status looks ready, we still need to wait for a graceful time
// When the network status looks ready, we should check whether need to wait for a graceful time
now := time.Now()
startedTime := now.Sub(*startedAt)
go func() {
sleepTime := operatortypes.TimeBeforeRecoverNetwork - startedTime
time.Sleep(sleepTime)
if (sharedInfo.LastNetworkAvailable[nodeName]) != *startedAt {
log.V(1).Info(fmt.Sprintf("Last network unavailable time of node %s changed, expecting %v, got %v, goroutine exiting",
nodeName, *startedAt, sharedInfo.LastNetworkAvailable[nodeName]))
return
if startedTime >= operatortypes.TimeBeforeRecoverNetwork {
err := status.updateNodeStatus(nodeName, ready, reason, message)
if err != nil {
return reconcile.Result{}, err
}
log.Info(fmt.Sprintf("Setting status NetworkUnavailable to %v for node %s after %v", !ready, nodeName, sleepTime))
status.updateNodeStatus(nodeName, ready, reason, message)
return
}()
return status.assertNodeStatus(nodeName, ready)
}
if err := status.updateNodeStatus(nodeName, false, reason, message); err != nil {
return reconcile.Result{}, err
}
sleepTime := operatortypes.TimeBeforeRecoverNetwork - startedTime
log.V(1).Info(fmt.Sprintf("Waiting %v to double check network status for node %s", sleepTime, nodeName))
return reconcile.Result{RequeueAfter: sleepTime}, nil
}

func (status *StatusManager) combineNode(nodeName string, ready bool, reason string, message string) (*corev1.Node, bool) {
Expand Down Expand Up @@ -198,37 +218,40 @@ func (status *StatusManager) combineNode(nodeName string, ready bool, reason str
return node, changed
}

func (status *StatusManager) updateNodeStatus(nodeName string, ready bool, reason string, message string) {
func (status *StatusManager) updateNodeStatus(nodeName string, ready bool, reason string, message string) error {
// need to retrieve the node info, otherwise, the update may fail because the node has been modified during monitor slept
node, changed := status.combineNode(nodeName, ready, reason, message)
if !changed {
log.Info("Node condition is not changed, skip updating")
return
log.V(1).Info(fmt.Sprintf("Node condition is not changed (nodeName: %s, reason: %s, message: %s), skip updating", nodeName, reason, message))
return nil
}
err := status.client.Status().Update(context.TODO(), node)
if err != nil {
if err := status.client.Status().Update(context.TODO(), node); err != nil {
log.Error(err, fmt.Sprintf("Failed to update node condition NetworkUnavailable to %v for node %s", !ready, nodeName))
} else {
log.Info(fmt.Sprintf("Updated node condition NetworkUnavailable to %v for node %s", !ready, nodeName))
return err
}
log.Info(fmt.Sprintf("Updated node condition NetworkUnavailable to %v for node %s", !ready, nodeName))
return nil
}

// Get the pod status from API server
func (status *StatusManager) SetNodeConditionFromPod(podName types.NamespacedName, sharedInfo *sharedinfo.SharedInfo, pod *corev1.Pod) {
func (status *StatusManager) SetNodeConditionFromPod(podName types.NamespacedName, sharedInfo *sharedinfo.SharedInfo, pod *corev1.Pod) (reconcile.Result, error) {
status.Lock()
defer status.Unlock()
status.setNodeConditionFromPod(podName, sharedInfo, pod)
return status.setNodeConditionFromPod(podName, sharedInfo, pod)
}

func (status *StatusManager) setNodeConditionFromPod(podName types.NamespacedName, sharedInfo *sharedinfo.SharedInfo, pod *corev1.Pod) {
func (status *StatusManager) setNodeConditionFromPod(podName types.NamespacedName, sharedInfo *sharedinfo.SharedInfo, pod *corev1.Pod) (reconcile.Result, error) {
var reason string
now := time.Now()
if pod == nil {
pod = &corev1.Pod{}
err := status.client.Get(context.TODO(), podName, pod)
if err != nil {
if err := status.client.Get(context.TODO(), podName, pod); err != nil {
log.Error(err, fmt.Sprintf("Error getting %s for node condition", operatortypes.NsxNodeAgentContainerName))
return
isNotFound := errors.IsNotFound(err)
if isNotFound {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
}
containerStatus := corev1.ContainerStatus{}
Expand All @@ -237,6 +260,11 @@ func (status *StatusManager) setNodeConditionFromPod(podName types.NamespacedNam
// tmpMsgs is for the status that pod just restarted in a short time
var messages, tmpMsgs string
nodeName := pod.Spec.NodeName
if nodeName == "" {
// This case may occur during the early stage of pod pending, but not all pending pods don't have field spec.nodeName
log.Info(fmt.Sprintf("Pod %s has not been scheduled, skipping", podName))
return reconcile.Result{}, nil
}
var startedAt *time.Time
var lastStartedAt *time.Time
if sharedInfo.LastNodeAgentStartTime == nil {
Expand All @@ -252,7 +280,7 @@ func (status *StatusManager) setNodeConditionFromPod(podName types.NamespacedNam
nodeLastStartedAt := sharedInfo.LastNodeAgentStartTime[nodeName]
if podStartedAt.Before(nodeLastStartedAt) {
log.Info(fmt.Sprintf("Pod %s started at %v on node %s is outdated, there's new pod started at %v", podName, podStartedAt, nodeName, nodeLastStartedAt))
return
return reconcile.Result{}, nil
} else {
sharedInfo.LastNodeAgentStartTime[nodeName] = podStartedAt
}
Expand Down Expand Up @@ -303,13 +331,7 @@ func (status *StatusManager) setNodeConditionFromPod(podName types.NamespacedNam
}
}
}
if len(tmpMsgs) > 0 {
// if the pod just restarted, we still set the ready to false then try to set it to true,
// i.e. invoke the setNodeNetworkUnavailable 2 times because there's no sequent pod status change to
// trigger the pod controller to set ready to true.
status.setNodeNetworkUnavailable(nodeName, false, nil, "NSXNodeAgent", messages + tmpMsgs, sharedInfo)
}
status.setNodeNetworkUnavailable(nodeName, ready, lastStartedAt, "NSXNodeAgent", messages, sharedInfo)
return status.setNodeNetworkUnavailable(nodeName, ready, lastStartedAt, "NSXNodeAgent", messages + tmpMsgs, sharedInfo)
}

// SetFromPodsForOverall sets the operator Degraded/Progressing/Available status, based on
Expand Down
1 change: 1 addition & 0 deletions pkg/types/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ const (
NsxNodeAgentContainerName string = "nsx-node-agent"
OsReleaseFile string = "/host/etc/os-release"
TimeBeforeRecoverNetwork time.Duration = 180 * time.Second
DefaultResyncPeriod time.Duration = 2 * time.Minute
)