From 4d3ba46cd2f7686feddbcfc49067735f423837a3 Mon Sep 17 00:00:00 2001 From: Arjun Baindur Date: Tue, 15 Feb 2022 20:31:03 -0800 Subject: [PATCH] Rechecking pending Pods (conflict resolved) --- doc/crds/daemonset-install.yaml | 2 + pkg/reconciler/iploop.go | 70 ++++++++++++++++++++++++++++---- pkg/reconciler/wrappedPod.go | 11 +++++ pkg/storage/kubernetes/client.go | 9 ++++ pkg/storage/storage.go | 3 +- 5 files changed, 86 insertions(+), 9 deletions(-) diff --git a/doc/crds/daemonset-install.yaml b/doc/crds/daemonset-install.yaml index 2c6ad832a..e758acd72 100644 --- a/doc/crds/daemonset-install.yaml +++ b/doc/crds/daemonset-install.yaml @@ -47,6 +47,7 @@ rules: - pods verbs: - list +<<<<<<< HEAD - watch - apiGroups: [""] resources: @@ -69,6 +70,7 @@ rules: - create - patch - update + - get --- apiVersion: apps/v1 kind: DaemonSet diff --git a/pkg/reconciler/iploop.go b/pkg/reconciler/iploop.go index c2c79cfc5..cc2b8fd65 100644 --- a/pkg/reconciler/iploop.go +++ b/pkg/reconciler/iploop.go @@ -103,19 +103,73 @@ func (rl *ReconcileLooper) findOrphanedIPsPerPool(ipPools []storage.IPPool) erro func (rl ReconcileLooper) isPodAlive(podRef string, ip string) bool { for livePodRef, livePod := range rl.liveWhereaboutsPods { if podRef == livePodRef { - livePodIPs := livePod.ips - logging.Debugf( - "pod reference %s matches allocation; Allocation IP: %s; PodIPs: %s", - livePodRef, - ip, - livePodIPs) - _, isFound := livePodIPs[ip] - return isFound || livePod.phase == v1.PodPending + isFound := isIpOnPod(&livePod, podRef, ip) + if !isFound && (livePod.phase == v1.PodPending) { + /* Sometimes pods are still coming up, and may not yet have Multus + * annotation added to it yet. We don't want to check the IPs yet + * so re-fetch the Pod 5x + */ + podToMatch := &livePod + retries := 0 + + logging.Debugf("Re-fetching Pending Pod: %s IP-to-match: %s", livePodRef, ip) + + for retries < storage.PodRefreshRetries { + retries += 1 + podToMatch = rl.refreshPod(livePodRef) + if podToMatch == nil { + logging.Debugf("Cleaning up...") + return false + } else if podToMatch.phase != v1.PodPending { + logging.Debugf("Pending Pod is now in phase: %s", podToMatch.phase) + break + } else { + isFound = isIpOnPod(podToMatch, podRef, ip) + // Short-circuit - Pending Pod may have IP now + if isFound { + logging.Debugf("Pod now has IP annotation while in Pending") + return true + } + time.Sleep(time.Duration(500) * time.Millisecond) + } + } + isFound = isIpOnPod(podToMatch, podRef, ip) + } + + return isFound } } return false } +func (rl ReconcileLooper) refreshPod(podRef string) *podWrapper { + namespace, podName := splitPodRef(podRef) + if namespace == "" || podName == "" { + logging.Errorf("Invalid podRef format: %s", podRef) + return nil + } + + pod, err := rl.k8sClient.GetPod(namespace, podName) + if err != nil { + logging.Errorf("Failed to refresh Pod %s: %s\n", podRef, err) + return nil + } + + wrappedPod := wrapPod(*pod) + logging.Debugf("Got refreshed pod: %v", wrappedPod) + return wrappedPod +} + +func splitPodRef(podRef string) (string, string) { + namespacedName := strings.Split(podRef, "/") + if len(namespacedName) != 2 { + logging.Errorf("Failed to split podRef %s", podRef) + return "", "" + } + + return namespacedName[0], namespacedName[1] +} + func composePodRef(pod v1.Pod) string { return fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName()) } diff --git a/pkg/reconciler/wrappedPod.go b/pkg/reconciler/wrappedPod.go index 9f4f81610..cb5e871df 100644 --- a/pkg/reconciler/wrappedPod.go +++ b/pkg/reconciler/wrappedPod.go @@ -89,3 +89,14 @@ func networkStatusFromPod(pod v1.Pod) string { } return networkStatusAnnotationValue } + +func isIpOnPod(livePod *podWrapper, podRef, ip string) bool { + livePodIPs := livePod.ips + logging.Debugf( + "pod reference %s matches allocation; Allocation IP: %s; PodIPs: %s", + podRef, + ip, + livePodIPs) + _, isFound := livePodIPs[ip] + return isFound +} diff --git a/pkg/storage/kubernetes/client.go b/pkg/storage/kubernetes/client.go index 6e2e8fca5..2fdc517af 100644 --- a/pkg/storage/kubernetes/client.go +++ b/pkg/storage/kubernetes/client.go @@ -107,6 +107,15 @@ func (i *Client) ListPods(ctx context.Context) ([]v1.Pod, error) { return podList.Items, nil } +func (i *Client) GetPod(namespace, name string) (*v1.Pod, error) { + pod, err := i.clientSet.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + return pod, nil +} + func (i *Client) ListOverlappingIPs(ctx context.Context) ([]whereaboutsv1alpha1.OverlappingRangeIPReservation, error) { ctxWithTimeout, cancel := context.WithTimeout(ctx, storage.RequestTimeout) defer cancel() diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 9ebac433d..52660eee2 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -13,7 +13,8 @@ var ( RequestTimeout = 10 * time.Second // DatastoreRetries defines how many retries are attempted when updating the Pool - DatastoreRetries = 100 + DatastoreRetries = 100 + PodRefreshRetries = 3 ) // IPPool is the interface that represents an manageable pool of allocated IPs