diff --git a/pkg/controller/config/config.go b/pkg/controller/config/config.go index 344102b5..064e378c 100644 --- a/pkg/controller/config/config.go +++ b/pkg/controller/config/config.go @@ -47,7 +47,6 @@ import ( clientcmdapi "k8s.io/client-go/tools/clientcmd/api" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" - crcache "sigs.k8s.io/controller-runtime/pkg/cache" gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" gwapiversioned "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned" @@ -218,7 +217,7 @@ func CreateWithConfig(ctx context.Context, restConfig *rest.Config, opt *Options configLog.Info("watching endpointslices - --enable-endpointslices-api is true") } - if opt.PublishSvc != "" && opt.PublishAddress != "" { + if opt.PublishService != "" && opt.PublishAddress != "" { return nil, fmt.Errorf("configure only one of --publish-service or --publish-address") } @@ -256,7 +255,7 @@ func CreateWithConfig(ctx context.Context, restConfig *rest.Config, opt *Options } } - if opt.UpdateStatus && podName == "" && opt.PublishSvc == "" && len(publishAddressHostnames)+len(publishAddressIPs) == 0 { + if opt.UpdateStatus && podName == "" && opt.PublishService == "" && len(publishAddressHostnames)+len(publishAddressIPs) == 0 { return nil, fmt.Errorf("one of --publish-service, --publish-address or POD_NAME envvar should be configured when --update-status=true") } @@ -317,34 +316,29 @@ func CreateWithConfig(ctx context.Context, restConfig *rest.Config, opt *Options configLog.Info("using default backend", "service", opt.DefaultSvc) } - if opt.PublishSvc != "" { - ns, name, err := cache.SplitMetaNamespaceKey(opt.PublishSvc) + if svc := opt.PublishService; svc != "" { + ns, name, err := cache.SplitMetaNamespaceKey(svc) if err != nil { return nil, fmt.Errorf("invalid service format: %w", err) } svc, err := client.CoreV1().Services(ns).Get(ctx, name, metav1.GetOptions{}) if err != nil { - return nil, fmt.Errorf("error getting information about service '%s': %w", opt.PublishSvc, err) + return nil, fmt.Errorf("error getting information about service '%s': %w", svc, err) } if len(svc.Status.LoadBalancer.Ingress) == 0 { if len(svc.Spec.ExternalIPs) == 0 { - return nil, fmt.Errorf("service '%s' does not (yet) have ingress points", opt.PublishSvc) + return nil, fmt.Errorf("service '%s' does not (yet) have ingress points", svc) } - configLog.Info("service validated as assigned with externalIP", "service", opt.PublishSvc) + configLog.Info("service validated as assigned with externalIP", "service", svc) } else { - configLog.Info("service validated as source of Ingress status", "service", opt.PublishSvc) + configLog.Info("service validated as source of Ingress status", "service", svc) } } - var watchNamespaces map[string]crcache.Config if opt.WatchNamespace != "" { _, err := client.NetworkingV1().Ingresses(opt.WatchNamespace).List(ctx, metav1.ListOptions{Limit: 1}) if err != nil { - return nil, fmt.Errorf("no watchNamespace with name '%s' found: %w", opt.WatchNamespace, err) - } - watchNamespaces = make(map[string]crcache.Config) - for _, ns := range strings.Split(opt.WatchNamespace, ",") { - watchNamespaces[ns] = crcache.Config{} + return nil, fmt.Errorf("no namespace with name '%s' found: %w", opt.WatchNamespace, err) } } else { _, err := client.CoreV1().Services("default").List(ctx, metav1.ListOptions{}) @@ -481,7 +475,7 @@ func CreateWithConfig(ctx context.Context, restConfig *rest.Config, opt *Options Profiling: opt.Profiling, PublishAddressHostnames: publishAddressHostnames, PublishAddressIPs: publishAddressIPs, - PublishService: opt.PublishSvc, + PublishService: opt.PublishService, RateLimitUpdate: opt.RateLimitUpdate, ReadyzURL: opt.ReadyzURL, ReloadInterval: opt.ReloadInterval, @@ -503,7 +497,7 @@ func CreateWithConfig(ctx context.Context, restConfig *rest.Config, opt *Options VersionInfo: versionInfo, WaitBeforeUpdate: opt.WaitBeforeUpdate, WatchIngressWithoutClass: opt.WatchIngressWithoutClass, - WatchNamespaces: watchNamespaces, + WatchNamespace: opt.WatchNamespace, }, nil } @@ -681,5 +675,5 @@ type Config struct { VersionInfo version.Info WaitBeforeUpdate time.Duration WatchIngressWithoutClass bool - WatchNamespaces map[string]crcache.Config + WatchNamespace string } diff --git a/pkg/controller/config/options.go b/pkg/controller/config/options.go index b6180652..77827543 100644 --- a/pkg/controller/config/options.go +++ b/pkg/controller/config/options.go @@ -64,7 +64,7 @@ type Options struct { AcmeTokenConfigMapName string AcmeTrackTLSAnn bool BucketsResponseTime []float64 - PublishSvc string + PublishService string PublishAddress string TCPConfigMapName string AnnPrefix string @@ -249,7 +249,7 @@ func (o *Options) AddFlags(fs *flag.FlagSet) { "the haproxy's admin socket. The response time unit is in seconds.", ) - fs.StringVar(&o.PublishSvc, "publish-service", o.PublishSvc, ""+ + fs.StringVar(&o.PublishService, "publish-service", o.PublishService, ""+ "Service fronting the ingress controllers. Takes the form namespace/name. The "+ "controller will set the endpoint records on the ingress objects to reflect "+ "those on the service.", @@ -297,8 +297,7 @@ func (o *Options) AddFlags(fs *flag.FlagSet) { ) fs.StringVar(&o.WatchNamespace, "watch-namespace", o.WatchNamespace, ""+ - "Comma-separated list of namespaces to watch for Ingress. Default is to watch "+ - "all namespaces", + "Namespace to watch for Ingress. Default is to watch all namespaces.", ) fs.DurationVar(&o.StatsCollectProcPeriod, "stats-collect-processing-period", o.StatsCollectProcPeriod, ""+ diff --git a/pkg/controller/launch/launch.go b/pkg/controller/launch/launch.go index fdea60b7..5640621a 100644 --- a/pkg/controller/launch/launch.go +++ b/pkg/controller/launch/launch.go @@ -35,6 +35,10 @@ func Run(cfg *config.Config) error { ctx := cfg.RootContext launchLog.Info("configuring manager") + var defaultNamespaces map[string]cache.Config + if cfg.WatchNamespace != "" { + defaultNamespaces = map[string]cache.Config{cfg.WatchNamespace: {}} + } mgr, err := ctrl.NewManager(cfg.KubeConfig, ctrl.Options{ Logger: rootLogger.WithName("manager"), Scheme: cfg.Scheme, @@ -45,7 +49,7 @@ func Run(cfg *config.Config) error { }, Cache: cache.Options{ SyncPeriod: cfg.ResyncPeriod, - DefaultNamespaces: cfg.WatchNamespaces, + DefaultNamespaces: defaultNamespaces, }, }) if err != nil { diff --git a/pkg/controller/reconciler/reconciler.go b/pkg/controller/reconciler/reconciler.go index 46a3d92a..ad7cd7d0 100644 --- a/pkg/controller/reconciler/reconciler.go +++ b/pkg/controller/reconciler/reconciler.go @@ -42,15 +42,15 @@ type IngressReconciler struct { // Reconcile ... func (r *IngressReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { changed := r.watchers.getChangedObjects() - r.Services.ReconcileIngress(changed) + r.Services.ReconcileIngress(ctx, changed) return ctrl.Result{}, nil } -func (r *IngressReconciler) leaderChanged(isLeader bool) { +func (r *IngressReconciler) leaderChanged(ctx context.Context, isLeader bool) { if isLeader && r.watchers.running() { changed := r.watchers.getChangedObjects() changed.NeedFullSync = true - r.Services.ReconcileIngress(changed) + r.Services.ReconcileIngress(ctx, changed) } } diff --git a/pkg/controller/reconciler/watchers.go b/pkg/controller/reconciler/watchers.go index 414d112a..df3022e9 100644 --- a/pkg/controller/reconciler/watchers.go +++ b/pkg/controller/reconciler/watchers.go @@ -143,6 +143,13 @@ func (w *watchers) handlersCore() []*hdlr { predicate.Or( predicate.AnnotationChangedPredicate{}, predicate.GenerationChangedPredicate{}, + predicate.NewPredicateFuncs(func(object client.Object) bool { + if w.cfg.PublishService == "" { + return false + } + svc := object.(*api.Service) + return svc.Namespace+"/"+svc.Name == w.cfg.PublishService + }), ), }, }, diff --git a/pkg/controller/services/services.go b/pkg/controller/services/services.go index 90b61d1e..7b247b8b 100644 --- a/pkg/controller/services/services.go +++ b/pkg/controller/services/services.go @@ -105,10 +105,7 @@ func (s *Services) setup(ctx context.Context) error { svchealthz := initSvcHealthz(ctx, cfg, metrics, s.acmeExternalCallCheck) svcstatus := initSvcStatusUpdater(ctx, s.Client) cache := createCacheFacade(ctx, s.Client, cfg, tracker, sslCerts, dynConfig, svcstatus.update) - var svcstatusing *svcStatusIng - if cfg.UpdateStatus { - svcstatusing = initSvcStatusIng(ctx, cfg, s.Client, cache, svcstatus.update) - } + svcstatusing := initSvcStatusIng(ctx, cfg, s.Client, cache, svcstatus.update) var acmeClient *svcAcmeClient var acmeServer *svcAcmeServer var acmeSigner acme.Signer @@ -193,7 +190,7 @@ func (s *Services) withManager(mgr ctrl.Manager) error { if err := s.svcleader.addRunnable(ctrlutils.DelayedShutdown(s.svcstatus)); err != nil { return err } - if s.svcstatusing != nil { + if s.Config.UpdateStatus { if err := s.svcleader.addRunnable(s.svcstatusing); err != nil { return err } @@ -251,7 +248,7 @@ func (s *Services) GetIsValidResource() IsValidResource { } // ReconcileIngress ... -func (s *Services) ReconcileIngress(changed *convtypes.ChangedObjects) { +func (s *Services) ReconcileIngress(ctx context.Context, changed *convtypes.ChangedObjects) { s.modelMutex.Lock() defer s.modelMutex.Unlock() s.updateCount++ @@ -262,6 +259,7 @@ func (s *Services) ReconcileIngress(changed *convtypes.ChangedObjects) { s.instance.AcmeUpdate() } s.instance.HAProxyUpdate(timer) + s.svcstatusing.changed(ctx, changed) s.log.WithValues("id", s.updateCount).WithValues(timer.AsValues("total")...).Info("finish haproxy update") } diff --git a/pkg/controller/services/svcleader.go b/pkg/controller/services/svcleader.go index 2fb6858a..c3260e8e 100644 --- a/pkg/controller/services/svcleader.go +++ b/pkg/controller/services/svcleader.go @@ -40,7 +40,7 @@ const ( ) // SvcLeaderChangedFnc ... -type SvcLeaderChangedFnc func(isLeader bool) +type SvcLeaderChangedFnc func(ctx context.Context, isLeader bool) func initSvcLeader(ctx context.Context, cfg *config.Config) (*svcLeader, error) { r, err := initRecorderProvider(cfg) @@ -59,6 +59,7 @@ func initSvcLeader(ctx context.Context, cfg *config.Config) (*svcLeader, error) } s := &svcLeader{ + ctx: ctx, log: logr.FromContextOrDiscard(ctx).WithName("leader"), } @@ -83,6 +84,7 @@ func initSvcLeader(ctx context.Context, cfg *config.Config) (*svcLeader, error) } type svcLeader struct { + ctx context.Context le *leaderelection.LeaderElector log logr.Logger runnables []manager.Runnable @@ -114,13 +116,13 @@ func (s *svcLeader) OnStartedLeading(ctx context.Context) { s.rcancel = cancel for _, f := range s.subscribers { - go f(true) + go f(ctx, true) } } func (s *svcLeader) OnStoppedLeading() { for _, f := range s.subscribers { - go f(false) + go f(s.ctx, false) } if s.rcancel != nil && s.rgroup != nil { diff --git a/pkg/controller/services/svcstatus.go b/pkg/controller/services/svcstatus.go index 0041ef71..97bf1055 100644 --- a/pkg/controller/services/svcstatus.go +++ b/pkg/controller/services/svcstatus.go @@ -40,18 +40,18 @@ func initSvcStatusUpdater(ctx context.Context, client client.Client) *svcStatusU } type svcStatusUpdater struct { - client client.Client - ctx context.Context - running bool - log logr.Logger - queue utils.Queue + client client.Client + ctx context.Context + run bool + log logr.Logger + queue utils.Queue } func (s *svcStatusUpdater) Start(ctx context.Context) error { s.ctx = ctx - s.running = true + s.run = true s.queue.RunWithContext(ctx) - s.running = false + s.run = false return nil } @@ -60,7 +60,7 @@ func (s *svcStatusUpdater) CanShutdown() bool { } func (s *svcStatusUpdater) update(obj client.Object) { - if s.running { + if s.run { s.queue.Add(obj) } } diff --git a/pkg/controller/services/svcstatusing.go b/pkg/controller/services/svcstatusing.go index 877ec93c..53129db9 100644 --- a/pkg/controller/services/svcstatusing.go +++ b/pkg/controller/services/svcstatusing.go @@ -18,9 +18,11 @@ package services import ( "context" + "errors" "fmt" "reflect" "sort" + "strings" "time" "github.com/go-logr/logr" @@ -28,11 +30,11 @@ import ( networking "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - clientcache "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/cache" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/jcmoraisjr/haproxy-ingress/pkg/controller/config" + convtypes "github.com/jcmoraisjr/haproxy-ingress/pkg/converters/types" ) func initSvcStatusIng(ctx context.Context, config *config.Config, client client.Client, cache *c, status svcStatusUpdateFnc) *svcStatusIng { @@ -53,6 +55,7 @@ type svcStatusIng struct { log logr.Logger cfg *config.Config cli client.Client + run bool cache *c status svcStatusUpdateFnc period time.Duration @@ -60,7 +63,10 @@ type svcStatusIng struct { } func (s *svcStatusIng) Start(ctx context.Context) error { - wait.UntilWithContext(ctx, s.sync, s.period) + s.run = true + <-ctx.Done() + s.run = false + // we need a new context, the one provided by the controller is canceled already. // this context's timeout is 90% of the manager's shutdown timeout timeout := *s.cfg.ShutdownTimeout * 9 / 10 @@ -70,6 +76,44 @@ func (s *svcStatusIng) Start(ctx context.Context) error { return nil } +func (s *svcStatusIng) changed(ctx context.Context, changed *convtypes.ChangedObjects) { + if !s.run { + return + } + s.sync(ctx) + + // Objects ([]string) has currently the following syntax: + // /:[/] + // Need to move to a structured type. + addIngPrefix := "add/" + string(convtypes.ResourceIngress) + ":" + svcPublSuffix := "/" + string(convtypes.ResourceService) + ":" + s.cfg.PublishService + + var errs []error + for _, obj := range changed.Objects { + if strings.HasPrefix(obj, addIngPrefix) { + fullname := obj[len(addIngPrefix):] + ns, n, _ := cache.SplitMetaNamespaceKey(fullname) + ing := networking.Ingress{} + err := s.cli.Get(ctx, types.NamespacedName{Namespace: ns, Name: n}, &ing) + if err != nil { + errs = append(errs, err) + continue + } + ing.Status.LoadBalancer.Ingress = s.curr + s.status(&ing) + } else if strings.HasSuffix(obj, svcPublSuffix) { + s.log.Info("publish service updated, updating all ingress status", "name", s.cfg.PublishService) + s.update(ctx, s.curr) + } + } + if len(errs) > 0 { + if len(errs) > 5 { + errs = errs[:5] + } + s.log.Error(errors.Join(errs...), "error syncing ingress status") + } +} + func (s *svcStatusIng) update(_ context.Context, lb []networking.IngressLoadBalancerIngress) error { ingList, err := s.cache.GetIngressList() if err != nil { @@ -95,7 +139,9 @@ func (s *svcStatusIng) shutdown(ctx context.Context) { return } s.log.Info("no other controller running, removing address from ingress status") - s.update(ctx, nil) + if err := s.update(ctx, nil); err != nil { + s.log.Error(err, "error listing ingress resources for status update") + } } func (s *svcStatusIng) sync(ctx context.Context) { @@ -103,7 +149,7 @@ func (s *svcStatusIng) sync(ctx context.Context) { if s.cfg.PublishService != "" { // read Hostnames and IPs from the configured service svc := api.Service{} - ns, n, _ := clientcache.SplitMetaNamespaceKey(s.cfg.PublishService) + ns, n, _ := cache.SplitMetaNamespaceKey(s.cfg.PublishService) if err := s.cli.Get(ctx, types.NamespacedName{Namespace: ns, Name: n}, &svc); err != nil { s.log.Error(err, "failed to read load balancer service") return @@ -131,7 +177,7 @@ func (s *svcStatusIng) sync(ctx context.Context) { } else { // fall back to an empty list and log an error if everything else failed s.log.Error(fmt.Errorf("cannot configure ingress status due to a failure reading the published hostnames/IPs"), ""+ - "either fix the configuration or the permission failures, "+ + "error configuring ingress status, either fix the configuration or the permission failures, "+ "configure --publish-service or --publish-address command-line options, "+ "or disable status update with --update-status=false") } @@ -143,7 +189,7 @@ func (s *svcStatusIng) sync(ctx context.Context) { }) if !reflect.DeepEqual(s.curr, lb) { if err := s.update(ctx, lb); err != nil { - s.log.Error(err, "failed to update ingress resources") + s.log.Error(err, "error updating ingress resources") return } s.curr = lb diff --git a/tests/framework/framework.go b/tests/framework/framework.go index 6eea2c95..f9d9d3e9 100644 --- a/tests/framework/framework.go +++ b/tests/framework/framework.go @@ -37,7 +37,9 @@ import ( ) const ( - PublishAddress = "10.0.1.1" + PublishSvcName = "default/publish" + PublishAddress = "10.0.1.1" + PublishHostname = "ingress.local" ) func NewFramework(ctx context.Context, t *testing.T) *framework { @@ -157,10 +159,23 @@ func (f *framework) StartController(ctx context.Context, t *testing.T) { err = f.cli.Create(ctx, &global) require.NoError(t, err) + publishService := corev1.Service{} + publishService.Namespace = "default" + publishService.Name = "publish" + publishService.Spec.Type = corev1.ServiceTypeLoadBalancer + publishService.Spec.Ports = []corev1.ServicePort{{Port: 80}} + err = f.cli.Create(ctx, &publishService) + require.NoError(t, err) + publishService.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{ + {IP: PublishAddress, Hostname: PublishHostname}, + } + err = f.cli.Status().Update(ctx, &publishService) + require.NoError(t, err) + opt := ctrlconfig.NewOptions() opt.MasterWorker = true opt.LocalFSPrefix = "/tmp/haproxy-ingress" - opt.PublishAddress = PublishAddress + opt.PublishService = PublishSvcName opt.ConfigMap = "default/ingress-controller" os.Setenv("POD_NAMESPACE", "default") ctx, cancel := context.WithCancel(ctx) diff --git a/tests/integration/integration_test.go b/tests/integration/integration_test.go index 08b532ae..30d6371c 100644 --- a/tests/integration/integration_test.go +++ b/tests/integration/integration_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" + "k8s.io/client-go/tools/cache" "sigs.k8s.io/controller-runtime/pkg/client" ingtypes "github.com/jcmoraisjr/haproxy-ingress/pkg/converters/ingress/types" @@ -89,21 +90,91 @@ func TestIntegration(t *testing.T) { expectedIngressStatus := networkingv1.IngressStatus{ LoadBalancer: networkingv1.IngressLoadBalancerStatus{ - Ingress: []networkingv1.IngressLoadBalancerIngress{{IP: framework.PublishAddress}}, + Ingress: []networkingv1.IngressLoadBalancerIngress{ + {IP: framework.PublishAddress, Hostname: framework.PublishHostname}, + }, }, } t.Run("should update ingress status", func(t *testing.T) { t.Parallel() svc := f.CreateService(ctx, t, httpPort) + + ing1 := f.CreateIngress(ctx, t, svc) + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + err := f.Client().Get(ctx, client.ObjectKeyFromObject(ing1), ing1) + if !assert.NoError(collect, err) { + return + } + assert.Equal(collect, expectedIngressStatus, ing1.Status) + }, 5*time.Second, time.Second) + + // testing two consecutive syncs + ing2 := f.CreateIngress(ctx, t, svc) + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + err := f.Client().Get(ctx, client.ObjectKeyFromObject(ing2), ing2) + if !assert.NoError(collect, err) { + return + } + assert.Equal(collect, expectedIngressStatus, ing2.Status) + }, 5*time.Second, time.Second) + }) + + t.Run("should sync ingress status from publish service", func(t *testing.T) { + t.Parallel() + svc := f.CreateService(ctx, t, httpPort) ing := f.CreateIngress(ctx, t, svc) + + // check initial status assert.EventuallyWithT(t, func(collect *assert.CollectT) { err := f.Client().Get(ctx, client.ObjectKeyFromObject(ing), ing) if !assert.NoError(collect, err) { return } assert.Equal(collect, expectedIngressStatus, ing.Status) - }, 10*time.Second, time.Second) + }, 5*time.Second, time.Second) + + tmpChangingIP := "127.0.0.1" + require.NotEqual(t, framework.PublishAddress, tmpChangingIP) + + // read and update publish svc status + svcpub := corev1.Service{} + svcpub.Namespace, svcpub.Name, _ = cache.SplitMetaNamespaceKey(framework.PublishSvcName) + err = f.Client().Get(ctx, client.ObjectKeyFromObject(&svcpub), &svcpub) + require.NoError(t, err) + svcpublb := svcpub.Status.LoadBalancer.Ingress + svcpub.Status.LoadBalancer.Ingress = []corev1.LoadBalancerIngress{{IP: tmpChangingIP}} + err = f.Client().Status().Update(ctx, &svcpub) + require.NoError(t, err) + + // check changed svc status + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + err := f.Client().Get(ctx, client.ObjectKeyFromObject(ing), ing) + if !assert.NoError(collect, err) { + return + } + assert.Equal(collect, networkingv1.IngressStatus{ + LoadBalancer: networkingv1.IngressLoadBalancerStatus{ + Ingress: []networkingv1.IngressLoadBalancerIngress{ + {IP: tmpChangingIP}, + }, + }, + }, ing.Status) + }, 5*time.Second, time.Second) + + // recover initial svc status + svcpub.Status.LoadBalancer.Ingress = svcpublb + err = f.Client().Status().Update(ctx, &svcpub) + require.NoError(t, err) + + // check recovered svc status + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + err := f.Client().Get(ctx, client.ObjectKeyFromObject(ing), ing) + if !assert.NoError(collect, err) { + return + } + assert.Equal(collect, expectedIngressStatus, ing.Status) + }, 5*time.Second, time.Second) }) t.Run("should override old status", func(t *testing.T) { @@ -114,6 +185,12 @@ func TestIntegration(t *testing.T) { return } assert.Equal(collect, expectedIngressStatus, ingpre1.Status) - }, 10*time.Second, time.Second) + }, 5*time.Second, time.Second) }) + + // should update status on class update + + // should limit read and update when watching namespace + + // should sync status on new leader }