Skip to content

Commit

Permalink
Fix EndpointSlice reconciliation
Browse files Browse the repository at this point in the history
Signed-off-by: Andrei Kvapil <kvapss@gmail.com>
  • Loading branch information
kvaps committed Dec 6, 2024
1 parent b7a51ba commit cc87e41
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 1,895 deletions.
2 changes: 1 addition & 1 deletion packages/apps/kubernetes/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.14.0
version: 0.14.1

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
ghcr.io/aenix-io/cozystack/kubevirt-cloud-provider:0.14.0@sha256:55b78220b60773eefb7b7d3451d7ab9fe89fb6b989e8fe2ae214aab164f00293
ghcr.io/aenix-io/cozystack/kubevirt-cloud-provider:latest@sha256:8fc186c44669c15d001d84035caae2fe4676dc8eb0bce75496cff500d36e7570
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ FROM --platform=linux/amd64 golang:1.20.6 AS builder

RUN git clone https://github.com/kubevirt/cloud-provider-kubevirt /go/src/kubevirt.io/cloud-provider-kubevirt \
&& cd /go/src/kubevirt.io/cloud-provider-kubevirt \
&& git checkout adbd6c27468b86b020cf38490e84f124ef24ab62
&& git checkout da9e0cf

WORKDIR /go/src/kubevirt.io/cloud-provider-kubevirt

# see: https://github.com/kubevirt/cloud-provider-kubevirt/pull/291
# see: https://github.com/kubevirt/cloud-provider-kubevirt/pull/335
# see: https://github.com/kubevirt/cloud-provider-kubevirt/pull/336
ADD patches /patches
RUN git apply /patches/external-traffic-policy-local.diff
RUN git apply /patches/*.diff
RUN go get 'k8s.io/endpointslice/util@v0.28' 'k8s.io/apiserver@v0.28'
RUN go mod tidy
RUN go mod vendor
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
diff --git a/pkg/controller/kubevirteps/kubevirteps_controller.go b/pkg/controller/kubevirteps/kubevirteps_controller.go
index a3c1aa33..1e76e6cc 100644
--- a/pkg/controller/kubevirteps/kubevirteps_controller.go
+++ b/pkg/controller/kubevirteps/kubevirteps_controller.go
@@ -413,10 +413,14 @@ func (c *Controller) reconcileByAddressType(service *v1.Service, tenantSlices []
var desiredPorts []discovery.EndpointPort

for _, port := range service.Spec.Ports {
+ pName := port.Name
+ pProtocol := port.Protocol
+ pVal := port.TargetPort.IntVal
+
desiredPorts = append(desiredPorts, discovery.EndpointPort{
- Port: &port.TargetPort.IntVal,
- Protocol: &port.Protocol,
- Name: &port.Name,
+ Port: &pVal,
+ Protocol: &pProtocol,
+ Name: &pName,
})
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
diff --git a/pkg/controller/kubevirteps/kubevirteps_controller.go b/pkg/controller/kubevirteps/kubevirteps_controller.go
index a3c1aa33..6f6e3d32 100644
--- a/pkg/controller/kubevirteps/kubevirteps_controller.go
+++ b/pkg/controller/kubevirteps/kubevirteps_controller.go
@@ -108,32 +108,24 @@ func newRequest(reqType ReqType, obj interface{}, oldObj interface{}) *Request {
}

func (c *Controller) Init() error {
-
- // Act on events from Services on the infra cluster. These are created by the EnsureLoadBalancer function.
- // We need to watch for these events so that we can update the EndpointSlices in the infra cluster accordingly.
+ // Existing Service event handlers...
_, err := c.infraFactory.Core().V1().Services().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
- // cast obj to Service
svc := obj.(*v1.Service)
- // Only act on Services of type LoadBalancer
if svc.Spec.Type == v1.ServiceTypeLoadBalancer {
klog.Infof("Service added: %v/%v", svc.Namespace, svc.Name)
c.queue.Add(newRequest(AddReq, obj, nil))
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
- // cast obj to Service
newSvc := newObj.(*v1.Service)
- // Only act on Services of type LoadBalancer
if newSvc.Spec.Type == v1.ServiceTypeLoadBalancer {
klog.Infof("Service updated: %v/%v", newSvc.Namespace, newSvc.Name)
c.queue.Add(newRequest(UpdateReq, newObj, oldObj))
}
},
DeleteFunc: func(obj interface{}) {
- // cast obj to Service
svc := obj.(*v1.Service)
- // Only act on Services of type LoadBalancer
if svc.Spec.Type == v1.ServiceTypeLoadBalancer {
klog.Infof("Service deleted: %v/%v", svc.Namespace, svc.Name)
c.queue.Add(newRequest(DeleteReq, obj, nil))
@@ -144,7 +136,7 @@ func (c *Controller) Init() error {
return err
}

- // Monitor endpoint slices that we are interested in based on known services in the infra cluster
+ // Existing EndpointSlice event handlers in tenant cluster...
_, err = c.tenantFactory.Discovery().V1().EndpointSlices().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
eps := obj.(*discovery.EndpointSlice)
@@ -194,10 +186,80 @@ func (c *Controller) Init() error {
return err
}

- //TODO: Add informer for EndpointSlices in the infra cluster to watch for (unwanted) changes
+ // Add an informer for EndpointSlices in the infra cluster
+ _, err = c.infraFactory.Discovery().V1().EndpointSlices().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ eps := obj.(*discovery.EndpointSlice)
+ if c.managedByController(eps) {
+ svc, svcErr := c.getInfraServiceForEPS(context.TODO(), eps)
+ if svcErr != nil {
+ klog.Errorf("Failed to get infra Service for EndpointSlice %s/%s: %v", eps.Namespace, eps.Name, svcErr)
+ return
+ }
+ if svc != nil {
+ klog.Infof("Infra EndpointSlice added: %v/%v, requeuing Service: %v/%v", eps.Namespace, eps.Name, svc.Namespace, svc.Name)
+ c.queue.Add(newRequest(AddReq, svc, nil))
+ }
+ }
+ },
+ UpdateFunc: func(oldObj, newObj interface{}) {
+ eps := newObj.(*discovery.EndpointSlice)
+ if c.managedByController(eps) {
+ svc, svcErr := c.getInfraServiceForEPS(context.TODO(), eps)
+ if svcErr != nil {
+ klog.Errorf("Failed to get infra Service for EndpointSlice %s/%s: %v", eps.Namespace, eps.Name, svcErr)
+ return
+ }
+ if svc != nil {
+ klog.Infof("Infra EndpointSlice updated: %v/%v, requeuing Service: %v/%v", eps.Namespace, eps.Name, svc.Namespace, svc.Name)
+ c.queue.Add(newRequest(UpdateReq, svc, nil))
+ }
+ }
+ },
+ DeleteFunc: func(obj interface{}) {
+ eps := obj.(*discovery.EndpointSlice)
+ if c.managedByController(eps) {
+ svc, svcErr := c.getInfraServiceForEPS(context.TODO(), eps)
+ if svcErr != nil {
+ klog.Errorf("Failed to get infra Service for EndpointSlice %s/%s on delete: %v", eps.Namespace, eps.Name, svcErr)
+ return
+ }
+ if svc != nil {
+ klog.Infof("Infra EndpointSlice deleted: %v/%v, requeuing Service: %v/%v", eps.Namespace, eps.Name, svc.Namespace, svc.Name)
+ c.queue.Add(newRequest(DeleteReq, svc, nil))
+ }
+ }
+ },
+ })
+ if err != nil {
+ return err
+ }
+
return nil
}

+// getInfraServiceForEPS returns the Service in the infra cluster associated with the given EndpointSlice.
+// It does this by reading the "kubernetes.io/service-name" label from the EndpointSlice, which should correspond
+// to the Service name. If not found or if the Service doesn't exist, it returns nil.
+func (c *Controller) getInfraServiceForEPS(ctx context.Context, eps *discovery.EndpointSlice) (*v1.Service, error) {
+ svcName := eps.Labels[discovery.LabelServiceName]
+ if svcName == "" {
+ // No service name label found, can't determine infra service.
+ return nil, nil
+ }
+
+ svc, err := c.infraClient.CoreV1().Services(c.infraNamespace).Get(ctx, svcName, metav1.GetOptions{})
+ if err != nil {
+ if k8serrors.IsNotFound(err) {
+ // Service doesn't exist
+ return nil, nil
+ }
+ return nil, err
+ }
+
+ return svc, nil
+}
+
// Run starts an asynchronous loop that monitors and updates GKENetworkParamSet in the cluster.
func (c *Controller) Run(numWorkers int, stopCh <-chan struct{}, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
defer utilruntime.HandleCrash()
Loading

0 comments on commit cc87e41

Please sign in to comment.