Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Align api calls timeouts cronjob ip reconciler #480

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 7 additions & 13 deletions cmd/whereabouts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ func AllocateAndReleaseAddressesTest(ipRange string, gw string, kubeconfigPath s
wbClient := *kubernetes.NewKubernetesClient(
fake.NewSimpleClientset(
ipPool(conf.IPRanges[0].Range, podNamespace, ipamNetworkName)),
fakek8sclient.NewSimpleClientset(),
0)
fakek8sclient.NewSimpleClientset())

for i := 0; i < len(expectedAddresses); i++ {
name := fmt.Sprintf("%s-%d", podName, i)
Expand Down Expand Up @@ -164,8 +163,7 @@ var _ = Describe("Whereabouts operations", func() {
fake.NewSimpleClientset(
ipPool(ipamConf.IPRanges[0].Range, podNamespace, ipamNetworkName, []whereaboutstypes.IPReservation{
{PodRef: ipamConf.GetPodRef(), IfName: ifname, IP: net.ParseIP(expectedAddress)}, {PodRef: "test"}}...)),
fakek8sclient.NewSimpleClientset(),
0)
fakek8sclient.NewSimpleClientset())

cniConf, err := newCNINetConf(cniVersion, ipamConf)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -928,8 +926,7 @@ var _ = Describe("Whereabouts operations", func() {
wbClient := *kubernetes.NewKubernetesClient(
fake.NewSimpleClientset(
ipPool(ipamConf.IPRanges[0].Range, podNamespace, ipamConf.NetworkName)),
fakek8sclient.NewSimpleClientset(),
0)
fakek8sclient.NewSimpleClientset())

// allocate 8 IPs (192.168.1.5 - 192.168.1.12); the entirety of the pool defined above
for i := 0; i < 8; i++ {
Expand Down Expand Up @@ -1000,8 +997,7 @@ var _ = Describe("Whereabouts operations", func() {
wbClient := *kubernetes.NewKubernetesClient(
fake.NewSimpleClientset(
ipPool(firstRange, podNamespace, ""), ipPool(secondRange, podNamespace, "")),
fakek8sclient.NewSimpleClientset(),
0)
fakek8sclient.NewSimpleClientset())

// ----------------------------- range 1

Expand Down Expand Up @@ -1124,8 +1120,7 @@ var _ = Describe("Whereabouts operations", func() {
wbClient := *kubernetes.NewKubernetesClient(
fake.NewSimpleClientset(
ipPool(firstRange, podNamespace, ""), ipPool(secondRange, podNamespace, "")),
fakek8sclient.NewSimpleClientset(),
0)
fakek8sclient.NewSimpleClientset())

// ----------------------------- range 1

Expand Down Expand Up @@ -1248,8 +1243,7 @@ var _ = Describe("Whereabouts operations", func() {
wbClient := *kubernetes.NewKubernetesClient(
fake.NewSimpleClientset(
ipPool(firstRange, podNamespace, ""), ipPool(secondRange, podNamespace, "")),
fakek8sclient.NewSimpleClientset(),
0)
fakek8sclient.NewSimpleClientset())

// ----------------------------- range 1

Expand Down Expand Up @@ -1373,7 +1367,7 @@ func newK8sIPAM(containerID, ifName string, ipamConf *whereaboutstypes.IPAMConfi
if err != nil {
return nil
}
k8sIPAM.Client = *kubernetes.NewKubernetesClient(wbClient, k8sCoreClient, 0)
k8sIPAM.Client = *kubernetes.NewKubernetesClient(wbClient, k8sCoreClient)
return k8sIPAM
}

Expand Down
10 changes: 10 additions & 0 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"net"
"os"
"sort"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -598,6 +599,10 @@ var _ = Describe("Whereabouts functionality", func() {
It("can reclaim the previously allocated IPs", func() {
By("checking that the IP allocation is removed when the pod is deleted")
Expect(clientInfo.ScaleStatefulSet(serviceName, namespace, -1)).To(Succeed())

const podDeleteTimeout = 20 * time.Second
err := wbtestclient.WaitForPodToDisappear(context.Background(), clientInfo.Client, namespace, podName, podDeleteTimeout)
Expect(err).NotTo(HaveOccurred())
verifyNoAllocationsForPodRef(clientInfo, rangeWithTwoIPs, namespace, podName, secondaryIPs)

By("adding previous allocations")
Expand Down Expand Up @@ -895,6 +900,11 @@ func allocationForPodRef(podRef string, ipPool v1alpha1.IPPool) []v1alpha1.IPAll
allocations = append(allocations, allocation)
}
}

sort.Slice(allocations, func(i, j int) bool {
return allocations[i].IfName < allocations[j].IfName
})

return allocations
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controlloop/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (pc *PodController) garbageCollectPodIPs(pod *v1.Pod) error {
if allocation.PodRef == podID(podNamespace, podName) {
logging.Verbosef("stale allocation to cleanup: %+v", allocation)

client := *wbclient.NewKubernetesClient(nil, pc.k8sClient, 0)
client := *wbclient.NewKubernetesClient(nil, pc.k8sClient)
wbClient := &wbclient.KubernetesIPAM{
Client: client,
Config: *ipamConfig,
Expand Down
15 changes: 3 additions & 12 deletions pkg/reconciler/ip.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,20 @@
package reconciler

import (
"context"
"time"

"github.com/k8snetworkplumbingwg/whereabouts/pkg/logging"
)

const (
defaultReconcilerTimeout = 30
)

func ReconcileIPs(errorChan chan error) {
logging.Verbosef("starting reconciler run")
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(defaultReconcilerTimeout*time.Second))
defer cancel()

ipReconcileLoop, err := NewReconcileLooper(ctx, defaultReconcilerTimeout)
ipReconcileLoop, err := NewReconcileLooper()
if err != nil {
_ = logging.Errorf("failed to create the reconcile looper: %v", err)
errorChan <- err
return
}

cleanedUpIps, err := ipReconcileLoop.ReconcileIPPools(ctx)
cleanedUpIps, err := ipReconcileLoop.ReconcileIPPools()
if err != nil {
_ = logging.Errorf("failed to clean up IP for allocations: %v", err)
errorChan <- err
Expand All @@ -36,7 +27,7 @@ func ReconcileIPs(errorChan chan error) {
logging.Debugf("no IP addresses to cleanup")
}

if err := ipReconcileLoop.ReconcileOverlappingIPAddresses(ctx); err != nil {
if err := ipReconcileLoop.ReconcileOverlappingIPAddresses(); err != nil {
errorChan <- err
return
}
Expand Down
35 changes: 17 additions & 18 deletions pkg/reconciler/ip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ var _ = Describe("Whereabouts IP reconciler", func() {
namespace = "default"
networkName = "net1"
podName = "pod1"
timeout = 10
)

var (
Expand Down Expand Up @@ -75,16 +74,16 @@ var _ = Describe("Whereabouts IP reconciler", func() {
Context("reconciling the IPPool", func() {
BeforeEach(func() {
var err error
reconcileLooper, err = NewReconcileLooperWithClient(context.TODO(), kubernetes.NewKubernetesClient(wbClient, k8sClientSet, timeout), timeout)
reconcileLooper, err = NewReconcileLooperWithClient(kubernetes.NewKubernetesClient(wbClient, k8sClientSet))
Expect(err).NotTo(HaveOccurred())
})

It("should report the deleted IP reservation", func() {
Expect(reconcileLooper.ReconcileIPPools(context.TODO())).To(Equal([]net.IP{net.ParseIP("10.10.10.1")}))
Expect(reconcileLooper.ReconcileIPPools()).To(Equal([]net.IP{net.ParseIP("10.10.10.1")}))
})

It("the pool's orphaned IP should be deleted after the reconcile loop", func() {
_, err := reconcileLooper.ReconcileIPPools(context.TODO())
_, err := reconcileLooper.ReconcileIPPools()
Expect(err).NotTo(HaveOccurred())
poolAfterCleanup, err := wbClient.WhereaboutsV1alpha1().IPPools(namespace).Get(context.TODO(), pool.GetName(), metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -139,18 +138,18 @@ var _ = Describe("Whereabouts IP reconciler", func() {
Context("reconciling the IPPool", func() {
BeforeEach(func() {
var err error
reconcileLooper, err = NewReconcileLooperWithClient(context.TODO(), kubernetes.NewKubernetesClient(wbClient, k8sClientSet, timeout), timeout)
reconcileLooper, err = NewReconcileLooperWithClient(kubernetes.NewKubernetesClient(wbClient, k8sClientSet))
Expect(err).NotTo(HaveOccurred())
})

It("should report the dead pod's IP address as deleted", func() {
deletedIPAddrs, err := reconcileLooper.ReconcileIPPools(context.TODO())
deletedIPAddrs, err := reconcileLooper.ReconcileIPPools()
Expect(err).NotTo(HaveOccurred())
Expect(deletedIPAddrs).To(Equal([]net.IP{net.ParseIP("10.10.10.1")}))
})

It("the IPPool should have only the IP reservation of the live pod", func() {
deletedIPAddrs, err := reconcileLooper.ReconcileIPPools(context.TODO())
deletedIPAddrs, err := reconcileLooper.ReconcileIPPools()
Expect(err).NotTo(HaveOccurred())
Expect(deletedIPAddrs).NotTo(BeEmpty())

Expand Down Expand Up @@ -190,11 +189,11 @@ var _ = Describe("Whereabouts IP reconciler", func() {

By("initializing the reconciler")
var err error
reconcileLooper, err = NewReconcileLooperWithClient(context.TODO(), kubernetes.NewKubernetesClient(wbClient, k8sClientSet, timeout), timeout)
reconcileLooper, err = NewReconcileLooperWithClient(kubernetes.NewKubernetesClient(wbClient, k8sClientSet))
Expect(err).NotTo(HaveOccurred())

By("reconciling and checking that the correct entry is deleted")
deletedIPAddrs, err := reconcileLooper.ReconcileIPPools(context.TODO())
deletedIPAddrs, err := reconcileLooper.ReconcileIPPools()
Expect(err).NotTo(HaveOccurred())
Expect(deletedIPAddrs).To(Equal([]net.IP{net.ParseIP("10.10.10.2")}))

Expand Down Expand Up @@ -272,9 +271,9 @@ var _ = Describe("Whereabouts IP reconciler", func() {

It("will delete an orphaned IP address", func() {
Expect(k8sClientSet.CoreV1().Pods(namespace).Delete(context.TODO(), pods[podIndexToRemove].Name, metav1.DeleteOptions{})).NotTo(HaveOccurred())
newReconciler, err := NewReconcileLooperWithClient(context.TODO(), kubernetes.NewKubernetesClient(wbClient, k8sClientSet, timeout), timeout)
newReconciler, err := NewReconcileLooperWithClient(kubernetes.NewKubernetesClient(wbClient, k8sClientSet))
Expect(err).NotTo(HaveOccurred())
Expect(newReconciler.ReconcileOverlappingIPAddresses(context.TODO())).To(Succeed())
Expect(newReconciler.ReconcileOverlappingIPAddresses()).To(Succeed())

expectedClusterWideIPs := 2
clusterWideIPAllocations, err := wbClient.WhereaboutsV1alpha1().OverlappingRangeIPReservations(namespace).List(context.TODO(), metav1.ListOptions{})
Expand Down Expand Up @@ -338,9 +337,9 @@ var _ = Describe("Whereabouts IP reconciler", func() {
})

It("will not delete an IP address that isn't orphaned after running reconciler", func() {
newReconciler, err := NewReconcileLooperWithClient(context.TODO(), kubernetes.NewKubernetesClient(wbClient, k8sClientSet, timeout), timeout)
newReconciler, err := NewReconcileLooperWithClient(kubernetes.NewKubernetesClient(wbClient, k8sClientSet))
Expect(err).NotTo(HaveOccurred())
Expect(newReconciler.ReconcileOverlappingIPAddresses(context.TODO())).To(Succeed())
Expect(newReconciler.ReconcileOverlappingIPAddresses()).To(Succeed())

expectedClusterWideIPs := 1
clusterWideIPAllocations, err := wbClient.WhereaboutsV1alpha1().OverlappingRangeIPReservations(namespace).List(context.TODO(), metav1.ListOptions{})
Expand Down Expand Up @@ -369,12 +368,12 @@ var _ = Describe("Whereabouts IP reconciler", func() {

pool = generateIPPoolSpec(ipRange, namespace, poolName, pod.Name)
wbClient = fakewbclient.NewSimpleClientset(pool)
reconcileLooper, err = NewReconcileLooperWithClient(context.TODO(), kubernetes.NewKubernetesClient(wbClient, k8sClientSet, timeout), timeout)
reconcileLooper, err = NewReconcileLooperWithClient(kubernetes.NewKubernetesClient(wbClient, k8sClientSet))
Expect(err).NotTo(HaveOccurred())
})

It("can be reconciled", func() {
Expect(reconcileLooper.ReconcileIPPools(context.TODO())).NotTo(BeEmpty())
Expect(reconcileLooper.ReconcileIPPools()).NotTo(BeEmpty())
})
})
})
Expand Down Expand Up @@ -410,7 +409,7 @@ var _ = Describe("IPReconciler", func() {
})

It("does not delete anything", func() {
reconciledIPs, err := ipReconciler.ReconcileIPPools(context.TODO())
reconciledIPs, err := ipReconciler.ReconcileIPPools()
Expect(err).NotTo(HaveOccurred())
Expect(reconciledIPs).To(BeEmpty())
})
Expand Down Expand Up @@ -438,7 +437,7 @@ var _ = Describe("IPReconciler", func() {
})

It("does delete the orphaned IP address", func() {
reconciledIPs, err := ipReconciler.ReconcileIPPools(context.TODO())
reconciledIPs, err := ipReconciler.ReconcileIPPools()
Expect(err).NotTo(HaveOccurred())
Expect(reconciledIPs).To(Equal([]net.IP{net.ParseIP(firstIPInRange)}))
})
Expand All @@ -458,7 +457,7 @@ var _ = Describe("IPReconciler", func() {
})

It("does delete *only the orphaned* the IP address", func() {
reconciledIPs, err := ipReconciler.ReconcileIPPools(context.TODO())
reconciledIPs, err := ipReconciler.ReconcileIPPools()
Expect(err).NotTo(HaveOccurred())
Expect(reconciledIPs).To(ConsistOf([]net.IP{net.ParseIP("192.168.14.2")}))
})
Expand Down
46 changes: 17 additions & 29 deletions pkg/reconciler/iploop.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,39 +21,29 @@ type ReconcileLooper struct {
liveWhereaboutsPods map[string]podWrapper
orphanedIPs []OrphanedIPReservations
orphanedClusterWideIPs []whereaboutsv1alpha1.OverlappingRangeIPReservation
requestTimeout int
}

type OrphanedIPReservations struct {
Pool storage.IPPool
Allocations []types.IPReservation
}

func NewReconcileLooperWithKubeconfig(ctx context.Context, kubeconfigPath string, timeout int) (*ReconcileLooper, error) {
logging.Debugf("NewReconcileLooper - Kubernetes config file located at: %s", kubeconfigPath)
k8sClient, err := kubernetes.NewClientViaKubeconfig(kubeconfigPath, time.Duration(timeout)*time.Second)
if err != nil {
return nil, logging.Errorf("failed to instantiate the Kubernetes client: %+v", err)
}
return NewReconcileLooperWithClient(ctx, k8sClient, timeout)
}

func NewReconcileLooper(ctx context.Context, timeout int) (*ReconcileLooper, error) {
func NewReconcileLooper() (*ReconcileLooper, error) {
logging.Debugf("NewReconcileLooper - inferred connection data")
k8sClient, err := kubernetes.NewClient(time.Duration(timeout) * time.Second)
k8sClient, err := kubernetes.NewClient()
if err != nil {
return nil, logging.Errorf("failed to instantiate the Kubernetes client: %+v", err)
}
return NewReconcileLooperWithClient(ctx, k8sClient, timeout)
return NewReconcileLooperWithClient(k8sClient)
}

func NewReconcileLooperWithClient(ctx context.Context, k8sClient *kubernetes.Client, timeout int) (*ReconcileLooper, error) {
ipPools, err := k8sClient.ListIPPools(ctx)
func NewReconcileLooperWithClient(k8sClient *kubernetes.Client) (*ReconcileLooper, error) {
ipPools, err := k8sClient.ListIPPools()
if err != nil {
return nil, logging.Errorf("failed to retrieve all IP pools: %v", err)
}

pods, err := k8sClient.ListPods(ctx)
pods, err := k8sClient.ListPods()
if err != nil {
return nil, err
}
Expand All @@ -62,14 +52,13 @@ func NewReconcileLooperWithClient(ctx context.Context, k8sClient *kubernetes.Cli
looper := &ReconcileLooper{
k8sClient: *k8sClient,
liveWhereaboutsPods: indexPods(pods, whereaboutsPodRefs),
requestTimeout: timeout,
}

if err := looper.findOrphanedIPsPerPool(ipPools); err != nil {
return nil, err
}

if err := looper.findClusterWideIPReservations(ctx); err != nil {
if err := looper.findClusterWideIPReservations(); err != nil {
return nil, err
}
return looper, nil
Expand Down Expand Up @@ -173,7 +162,7 @@ func composePodRef(pod v1.Pod) string {
return fmt.Sprintf("%s/%s", pod.GetNamespace(), pod.GetName())
}

func (rl ReconcileLooper) ReconcileIPPools(ctx context.Context) ([]net.IP, error) {
func (rl ReconcileLooper) ReconcileIPPools() ([]net.IP, error) {
findAllocationIndex := func(reservation types.IPReservation, reservations []types.IPReservation) int {
for idx, r := range reservations {
if r.PodRef == reservation.PodRef && r.IP.Equal(reservation.IP) {
Expand Down Expand Up @@ -206,21 +195,23 @@ func (rl ReconcileLooper) ReconcileIPPools(ctx context.Context) ([]net.IP, error

if len(cleanedUpIpsPerPool) != 0 {
logging.Debugf("Going to update the reserve list to: %+v", currentIPReservations)

ctx, cancel := context.WithTimeout(context.Background(), storage.RequestTimeout)
if err := orphanedIP.Pool.Update(ctx, currentIPReservations); err != nil {
cancel()
return nil, logging.Errorf("failed to update the reservation list: %v", err)
}

cancel()
totalCleanedUpIps = append(totalCleanedUpIps, cleanedUpIpsPerPool...)
}
}

return totalCleanedUpIps, nil
}

func (rl *ReconcileLooper) findClusterWideIPReservations(ctx context.Context) error {
ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Duration(rl.requestTimeout)*time.Second)
defer cancel()

clusterWideIPReservations, err := rl.k8sClient.ListOverlappingIPs(ctxWithTimeout)
func (rl *ReconcileLooper) findClusterWideIPReservations() error {
clusterWideIPReservations, err := rl.k8sClient.ListOverlappingIPs()
if err != nil {
return logging.Errorf("failed to list all OverLappingIPs: %v", err)
}
Expand All @@ -243,14 +234,11 @@ func (rl *ReconcileLooper) findClusterWideIPReservations(ctx context.Context) er
return nil
}

func (rl ReconcileLooper) ReconcileOverlappingIPAddresses(ctx context.Context) error {
func (rl ReconcileLooper) ReconcileOverlappingIPAddresses() error {
var failedReconciledClusterWideIPs []string

ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Duration(rl.requestTimeout)*time.Second)
defer cancel()

for _, overlappingIPStruct := range rl.orphanedClusterWideIPs {
if err := rl.k8sClient.DeleteOverlappingIP(ctxWithTimeout, &overlappingIPStruct); err != nil {
if err := rl.k8sClient.DeleteOverlappingIP(&overlappingIPStruct); err != nil {
logging.Errorf("failed to remove cluster wide IP: %s", overlappingIPStruct.GetName())
failedReconciledClusterWideIPs = append(failedReconciledClusterWideIPs, overlappingIPStruct.GetName())
continue
Expand Down
Loading
Loading