|
| 1 | +diff --git a/pkg/controller/kubevirteps/kubevirteps_controller.go b/pkg/controller/kubevirteps/kubevirteps_controller.go |
| 2 | +index a3c1aa33..6f6e3d32 100644 |
| 3 | +--- a/pkg/controller/kubevirteps/kubevirteps_controller.go |
| 4 | ++++ b/pkg/controller/kubevirteps/kubevirteps_controller.go |
| 5 | +@@ -108,32 +108,24 @@ func newRequest(reqType ReqType, obj interface{}, oldObj interface{}) *Request { |
| 6 | + } |
| 7 | + |
| 8 | + func (c *Controller) Init() error { |
| 9 | +- |
| 10 | +- // Act on events from Services on the infra cluster. These are created by the EnsureLoadBalancer function. |
| 11 | +- // We need to watch for these events so that we can update the EndpointSlices in the infra cluster accordingly. |
| 12 | ++ // Existing Service event handlers... |
| 13 | + _, err := c.infraFactory.Core().V1().Services().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ |
| 14 | + AddFunc: func(obj interface{}) { |
| 15 | +- // cast obj to Service |
| 16 | + svc := obj.(*v1.Service) |
| 17 | +- // Only act on Services of type LoadBalancer |
| 18 | + if svc.Spec.Type == v1.ServiceTypeLoadBalancer { |
| 19 | + klog.Infof("Service added: %v/%v", svc.Namespace, svc.Name) |
| 20 | + c.queue.Add(newRequest(AddReq, obj, nil)) |
| 21 | + } |
| 22 | + }, |
| 23 | + UpdateFunc: func(oldObj, newObj interface{}) { |
| 24 | +- // cast obj to Service |
| 25 | + newSvc := newObj.(*v1.Service) |
| 26 | +- // Only act on Services of type LoadBalancer |
| 27 | + if newSvc.Spec.Type == v1.ServiceTypeLoadBalancer { |
| 28 | + klog.Infof("Service updated: %v/%v", newSvc.Namespace, newSvc.Name) |
| 29 | + c.queue.Add(newRequest(UpdateReq, newObj, oldObj)) |
| 30 | + } |
| 31 | + }, |
| 32 | + DeleteFunc: func(obj interface{}) { |
| 33 | +- // cast obj to Service |
| 34 | + svc := obj.(*v1.Service) |
| 35 | +- // Only act on Services of type LoadBalancer |
| 36 | + if svc.Spec.Type == v1.ServiceTypeLoadBalancer { |
| 37 | + klog.Infof("Service deleted: %v/%v", svc.Namespace, svc.Name) |
| 38 | + c.queue.Add(newRequest(DeleteReq, obj, nil)) |
| 39 | +@@ -144,7 +136,7 @@ func (c *Controller) Init() error { |
| 40 | + return err |
| 41 | + } |
| 42 | + |
| 43 | +- // Monitor endpoint slices that we are interested in based on known services in the infra cluster |
| 44 | ++ // Existing EndpointSlice event handlers in tenant cluster... |
| 45 | + _, err = c.tenantFactory.Discovery().V1().EndpointSlices().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ |
| 46 | + AddFunc: func(obj interface{}) { |
| 47 | + eps := obj.(*discovery.EndpointSlice) |
| 48 | +@@ -194,10 +186,80 @@ func (c *Controller) Init() error { |
| 49 | + return err |
| 50 | + } |
| 51 | + |
| 52 | +- //TODO: Add informer for EndpointSlices in the infra cluster to watch for (unwanted) changes |
| 53 | ++ // Add an informer for EndpointSlices in the infra cluster |
| 54 | ++ _, err = c.infraFactory.Discovery().V1().EndpointSlices().Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ |
| 55 | ++ AddFunc: func(obj interface{}) { |
| 56 | ++ eps := obj.(*discovery.EndpointSlice) |
| 57 | ++ if c.managedByController(eps) { |
| 58 | ++ svc, svcErr := c.getInfraServiceForEPS(context.TODO(), eps) |
| 59 | ++ if svcErr != nil { |
| 60 | ++ klog.Errorf("Failed to get infra Service for EndpointSlice %s/%s: %v", eps.Namespace, eps.Name, svcErr) |
| 61 | ++ return |
| 62 | ++ } |
| 63 | ++ if svc != nil { |
| 64 | ++ klog.Infof("Infra EndpointSlice added: %v/%v, requeuing Service: %v/%v", eps.Namespace, eps.Name, svc.Namespace, svc.Name) |
| 65 | ++ c.queue.Add(newRequest(AddReq, svc, nil)) |
| 66 | ++ } |
| 67 | ++ } |
| 68 | ++ }, |
| 69 | ++ UpdateFunc: func(oldObj, newObj interface{}) { |
| 70 | ++ eps := newObj.(*discovery.EndpointSlice) |
| 71 | ++ if c.managedByController(eps) { |
| 72 | ++ svc, svcErr := c.getInfraServiceForEPS(context.TODO(), eps) |
| 73 | ++ if svcErr != nil { |
| 74 | ++ klog.Errorf("Failed to get infra Service for EndpointSlice %s/%s: %v", eps.Namespace, eps.Name, svcErr) |
| 75 | ++ return |
| 76 | ++ } |
| 77 | ++ if svc != nil { |
| 78 | ++ klog.Infof("Infra EndpointSlice updated: %v/%v, requeuing Service: %v/%v", eps.Namespace, eps.Name, svc.Namespace, svc.Name) |
| 79 | ++ c.queue.Add(newRequest(UpdateReq, svc, nil)) |
| 80 | ++ } |
| 81 | ++ } |
| 82 | ++ }, |
| 83 | ++ DeleteFunc: func(obj interface{}) { |
| 84 | ++ eps := obj.(*discovery.EndpointSlice) |
| 85 | ++ if c.managedByController(eps) { |
| 86 | ++ svc, svcErr := c.getInfraServiceForEPS(context.TODO(), eps) |
| 87 | ++ if svcErr != nil { |
| 88 | ++ klog.Errorf("Failed to get infra Service for EndpointSlice %s/%s on delete: %v", eps.Namespace, eps.Name, svcErr) |
| 89 | ++ return |
| 90 | ++ } |
| 91 | ++ if svc != nil { |
| 92 | ++ klog.Infof("Infra EndpointSlice deleted: %v/%v, requeuing Service: %v/%v", eps.Namespace, eps.Name, svc.Namespace, svc.Name) |
| 93 | ++ c.queue.Add(newRequest(DeleteReq, svc, nil)) |
| 94 | ++ } |
| 95 | ++ } |
| 96 | ++ }, |
| 97 | ++ }) |
| 98 | ++ if err != nil { |
| 99 | ++ return err |
| 100 | ++ } |
| 101 | ++ |
| 102 | + return nil |
| 103 | + } |
| 104 | + |
| 105 | ++// getInfraServiceForEPS returns the Service in the infra cluster associated with the given EndpointSlice. |
| 106 | ++// It does this by reading the "kubernetes.io/service-name" label from the EndpointSlice, which should correspond |
| 107 | ++// to the Service name. If not found or if the Service doesn't exist, it returns nil. |
| 108 | ++func (c *Controller) getInfraServiceForEPS(ctx context.Context, eps *discovery.EndpointSlice) (*v1.Service, error) { |
| 109 | ++ svcName := eps.Labels[discovery.LabelServiceName] |
| 110 | ++ if svcName == "" { |
| 111 | ++ // No service name label found, can't determine infra service. |
| 112 | ++ return nil, nil |
| 113 | ++ } |
| 114 | ++ |
| 115 | ++ svc, err := c.infraClient.CoreV1().Services(c.infraNamespace).Get(ctx, svcName, metav1.GetOptions{}) |
| 116 | ++ if err != nil { |
| 117 | ++ if k8serrors.IsNotFound(err) { |
| 118 | ++ // Service doesn't exist |
| 119 | ++ return nil, nil |
| 120 | ++ } |
| 121 | ++ return nil, err |
| 122 | ++ } |
| 123 | ++ |
| 124 | ++ return svc, nil |
| 125 | ++} |
| 126 | ++ |
| 127 | + // Run starts an asynchronous loop that monitors and updates GKENetworkParamSet in the cluster. |
| 128 | + func (c *Controller) Run(numWorkers int, stopCh <-chan struct{}, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) { |
| 129 | + defer utilruntime.HandleCrash() |
0 commit comments