Skip to content

Commit

Permalink
Merge pull request #1091 from jcmoraisjr/jm-status-ingress
Browse files Browse the repository at this point in the history
Status update via merge-patch strategy
  • Loading branch information
jcmoraisjr authored Mar 20, 2024
2 parents 72d85df + 1cb33bf commit 489151b
Show file tree
Hide file tree
Showing 11 changed files with 266 additions and 93 deletions.
30 changes: 12 additions & 18 deletions pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import (
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
crcache "sigs.k8s.io/controller-runtime/pkg/cache"
gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2"
gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"
gwapiversioned "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -218,7 +217,7 @@ func CreateWithConfig(ctx context.Context, restConfig *rest.Config, opt *Options
configLog.Info("watching endpointslices - --enable-endpointslices-api is true")
}

if opt.PublishSvc != "" && opt.PublishAddress != "" {
if opt.PublishService != "" && opt.PublishAddress != "" {
return nil, fmt.Errorf("configure only one of --publish-service or --publish-address")
}

Expand Down Expand Up @@ -256,7 +255,7 @@ func CreateWithConfig(ctx context.Context, restConfig *rest.Config, opt *Options
}
}

if opt.UpdateStatus && podName == "" && opt.PublishSvc == "" && len(publishAddressHostnames)+len(publishAddressIPs) == 0 {
if opt.UpdateStatus && podName == "" && opt.PublishService == "" && len(publishAddressHostnames)+len(publishAddressIPs) == 0 {
return nil, fmt.Errorf("one of --publish-service, --publish-address or POD_NAME envvar should be configured when --update-status=true")
}

Expand Down Expand Up @@ -317,34 +316,29 @@ func CreateWithConfig(ctx context.Context, restConfig *rest.Config, opt *Options
configLog.Info("using default backend", "service", opt.DefaultSvc)
}

if opt.PublishSvc != "" {
ns, name, err := cache.SplitMetaNamespaceKey(opt.PublishSvc)
if svc := opt.PublishService; svc != "" {
ns, name, err := cache.SplitMetaNamespaceKey(svc)
if err != nil {
return nil, fmt.Errorf("invalid service format: %w", err)
}
svc, err := client.CoreV1().Services(ns).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error getting information about service '%s': %w", opt.PublishSvc, err)
return nil, fmt.Errorf("error getting information about service '%s': %w", svc, err)
}
if len(svc.Status.LoadBalancer.Ingress) == 0 {
if len(svc.Spec.ExternalIPs) == 0 {
return nil, fmt.Errorf("service '%s' does not (yet) have ingress points", opt.PublishSvc)
return nil, fmt.Errorf("service '%s' does not (yet) have ingress points", svc)
}
configLog.Info("service validated as assigned with externalIP", "service", opt.PublishSvc)
configLog.Info("service validated as assigned with externalIP", "service", svc)
} else {
configLog.Info("service validated as source of Ingress status", "service", opt.PublishSvc)
configLog.Info("service validated as source of Ingress status", "service", svc)
}
}

var watchNamespaces map[string]crcache.Config
if opt.WatchNamespace != "" {
_, err := client.NetworkingV1().Ingresses(opt.WatchNamespace).List(ctx, metav1.ListOptions{Limit: 1})
if err != nil {
return nil, fmt.Errorf("no watchNamespace with name '%s' found: %w", opt.WatchNamespace, err)
}
watchNamespaces = make(map[string]crcache.Config)
for _, ns := range strings.Split(opt.WatchNamespace, ",") {
watchNamespaces[ns] = crcache.Config{}
return nil, fmt.Errorf("no namespace with name '%s' found: %w", opt.WatchNamespace, err)
}
} else {
_, err := client.CoreV1().Services("default").List(ctx, metav1.ListOptions{})
Expand Down Expand Up @@ -481,7 +475,7 @@ func CreateWithConfig(ctx context.Context, restConfig *rest.Config, opt *Options
Profiling: opt.Profiling,
PublishAddressHostnames: publishAddressHostnames,
PublishAddressIPs: publishAddressIPs,
PublishService: opt.PublishSvc,
PublishService: opt.PublishService,
RateLimitUpdate: opt.RateLimitUpdate,
ReadyzURL: opt.ReadyzURL,
ReloadInterval: opt.ReloadInterval,
Expand All @@ -503,7 +497,7 @@ func CreateWithConfig(ctx context.Context, restConfig *rest.Config, opt *Options
VersionInfo: versionInfo,
WaitBeforeUpdate: opt.WaitBeforeUpdate,
WatchIngressWithoutClass: opt.WatchIngressWithoutClass,
WatchNamespaces: watchNamespaces,
WatchNamespace: opt.WatchNamespace,
}, nil
}

Expand Down Expand Up @@ -681,5 +675,5 @@ type Config struct {
VersionInfo version.Info
WaitBeforeUpdate time.Duration
WatchIngressWithoutClass bool
WatchNamespaces map[string]crcache.Config
WatchNamespace string
}
7 changes: 3 additions & 4 deletions pkg/controller/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type Options struct {
AcmeTokenConfigMapName string
AcmeTrackTLSAnn bool
BucketsResponseTime []float64
PublishSvc string
PublishService string
PublishAddress string
TCPConfigMapName string
AnnPrefix string
Expand Down Expand Up @@ -249,7 +249,7 @@ func (o *Options) AddFlags(fs *flag.FlagSet) {
"the haproxy's admin socket. The response time unit is in seconds.",
)

fs.StringVar(&o.PublishSvc, "publish-service", o.PublishSvc, ""+
fs.StringVar(&o.PublishService, "publish-service", o.PublishService, ""+
"Service fronting the ingress controllers. Takes the form namespace/name. The "+
"controller will set the endpoint records on the ingress objects to reflect "+
"those on the service.",
Expand Down Expand Up @@ -297,8 +297,7 @@ func (o *Options) AddFlags(fs *flag.FlagSet) {
)

fs.StringVar(&o.WatchNamespace, "watch-namespace", o.WatchNamespace, ""+
"Comma-separated list of namespaces to watch for Ingress. Default is to watch "+
"all namespaces",
"Namespace to watch for Ingress. Default is to watch all namespaces.",
)

fs.DurationVar(&o.StatsCollectProcPeriod, "stats-collect-processing-period", o.StatsCollectProcPeriod, ""+
Expand Down
6 changes: 5 additions & 1 deletion pkg/controller/launch/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func Run(cfg *config.Config) error {
ctx := cfg.RootContext

launchLog.Info("configuring manager")
var defaultNamespaces map[string]cache.Config
if cfg.WatchNamespace != "" {
defaultNamespaces = map[string]cache.Config{cfg.WatchNamespace: {}}
}
mgr, err := ctrl.NewManager(cfg.KubeConfig, ctrl.Options{
Logger: rootLogger.WithName("manager"),
Scheme: cfg.Scheme,
Expand All @@ -45,7 +49,7 @@ func Run(cfg *config.Config) error {
},
Cache: cache.Options{
SyncPeriod: cfg.ResyncPeriod,
DefaultNamespaces: cfg.WatchNamespaces,
DefaultNamespaces: defaultNamespaces,
},
})
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,15 @@ type IngressReconciler struct {
// Reconcile ...
func (r *IngressReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
changed := r.watchers.getChangedObjects()
r.Services.ReconcileIngress(changed)
r.Services.ReconcileIngress(ctx, changed)
return ctrl.Result{}, nil
}

func (r *IngressReconciler) leaderChanged(isLeader bool) {
func (r *IngressReconciler) leaderChanged(ctx context.Context, isLeader bool) {
if isLeader && r.watchers.running() {
changed := r.watchers.getChangedObjects()
changed.NeedFullSync = true
r.Services.ReconcileIngress(changed)
r.Services.ReconcileIngress(ctx, changed)
}
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/controller/reconciler/watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ func (w *watchers) handlersCore() []*hdlr {
predicate.Or(
predicate.AnnotationChangedPredicate{},
predicate.GenerationChangedPredicate{},
predicate.NewPredicateFuncs(func(object client.Object) bool {
if w.cfg.PublishService == "" {
return false
}
svc := object.(*api.Service)
return svc.Namespace+"/"+svc.Name == w.cfg.PublishService
}),
),
},
},
Expand Down
10 changes: 4 additions & 6 deletions pkg/controller/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,7 @@ func (s *Services) setup(ctx context.Context) error {
svchealthz := initSvcHealthz(ctx, cfg, metrics, s.acmeExternalCallCheck)
svcstatus := initSvcStatusUpdater(ctx, s.Client)
cache := createCacheFacade(ctx, s.Client, cfg, tracker, sslCerts, dynConfig, svcstatus.update)
var svcstatusing *svcStatusIng
if cfg.UpdateStatus {
svcstatusing = initSvcStatusIng(ctx, cfg, s.Client, cache, svcstatus.update)
}
svcstatusing := initSvcStatusIng(ctx, cfg, s.Client, cache, svcstatus.update)
var acmeClient *svcAcmeClient
var acmeServer *svcAcmeServer
var acmeSigner acme.Signer
Expand Down Expand Up @@ -193,7 +190,7 @@ func (s *Services) withManager(mgr ctrl.Manager) error {
if err := s.svcleader.addRunnable(ctrlutils.DelayedShutdown(s.svcstatus)); err != nil {
return err
}
if s.svcstatusing != nil {
if s.Config.UpdateStatus {
if err := s.svcleader.addRunnable(s.svcstatusing); err != nil {
return err
}
Expand Down Expand Up @@ -251,7 +248,7 @@ func (s *Services) GetIsValidResource() IsValidResource {
}

// ReconcileIngress ...
func (s *Services) ReconcileIngress(changed *convtypes.ChangedObjects) {
func (s *Services) ReconcileIngress(ctx context.Context, changed *convtypes.ChangedObjects) {
s.modelMutex.Lock()
defer s.modelMutex.Unlock()
s.updateCount++
Expand All @@ -262,6 +259,7 @@ func (s *Services) ReconcileIngress(changed *convtypes.ChangedObjects) {
s.instance.AcmeUpdate()
}
s.instance.HAProxyUpdate(timer)
s.svcstatusing.changed(ctx, changed)
s.log.WithValues("id", s.updateCount).WithValues(timer.AsValues("total")...).Info("finish haproxy update")
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/controller/services/svcleader.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (
)

// SvcLeaderChangedFnc ...
type SvcLeaderChangedFnc func(isLeader bool)
type SvcLeaderChangedFnc func(ctx context.Context, isLeader bool)

func initSvcLeader(ctx context.Context, cfg *config.Config) (*svcLeader, error) {
r, err := initRecorderProvider(cfg)
Expand All @@ -59,6 +59,7 @@ func initSvcLeader(ctx context.Context, cfg *config.Config) (*svcLeader, error)
}

s := &svcLeader{
ctx: ctx,
log: logr.FromContextOrDiscard(ctx).WithName("leader"),
}

Expand All @@ -83,6 +84,7 @@ func initSvcLeader(ctx context.Context, cfg *config.Config) (*svcLeader, error)
}

type svcLeader struct {
ctx context.Context
le *leaderelection.LeaderElector
log logr.Logger
runnables []manager.Runnable
Expand Down Expand Up @@ -114,13 +116,13 @@ func (s *svcLeader) OnStartedLeading(ctx context.Context) {
s.rcancel = cancel

for _, f := range s.subscribers {
go f(true)
go f(ctx, true)
}
}

func (s *svcLeader) OnStoppedLeading() {
for _, f := range s.subscribers {
go f(false)
go f(s.ctx, false)
}

if s.rcancel != nil && s.rgroup != nil {
Expand Down
66 changes: 25 additions & 41 deletions pkg/controller/services/svcstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/jcmoraisjr/haproxy-ingress/pkg/utils"
Expand All @@ -41,15 +40,27 @@ func initSvcStatusUpdater(ctx context.Context, client client.Client) *svcStatusU
}

type svcStatusUpdater struct {
client client.Client
ctx context.Context
isleader bool
log logr.Logger
queue utils.Queue
client client.Client
ctx context.Context
run bool
log logr.Logger
queue utils.Queue
}

func (s *svcStatusUpdater) Start(ctx context.Context) error {
s.ctx = ctx
s.run = true
s.queue.RunWithContext(ctx)
s.run = false
return nil
}

func (s *svcStatusUpdater) CanShutdown() bool {
return s.queue.Len() == 0
}

func (s *svcStatusUpdater) update(obj client.Object) {
if s.isleader {
if s.run {
s.queue.Add(obj)
}
}
Expand All @@ -59,41 +70,14 @@ func (s *svcStatusUpdater) notify(item interface{}) error {
namespace := obj.GetNamespace()
name := obj.GetName()
log := s.log.WithValues("kind", reflect.TypeOf(obj), "namespace", namespace, "name", name)
if err := s.client.Status().Update(s.ctx, obj); err != nil {
// usually `obj` is up to date, but in case of a concurrent
// update, we'll refresh the object into a new instance and
// copy the updated status to it.
typ := reflect.TypeOf(obj)
if typ.Kind() == reflect.Pointer {
typ = typ.Elem()
}
new := reflect.New(typ).Interface().(client.Object)
if err := s.client.Get(s.ctx, types.NamespacedName{Namespace: namespace, Name: name}, new); err != nil {
log.Error(err, "cannot read status")
return err
}
// a reflection trick to copy the updated status from the outdated object to the new updated one
reflect.ValueOf(new).Elem().FieldByName("Status").Set(
reflect.ValueOf(obj).Elem().FieldByName("Status"))
if err := s.client.Status().Update(s.ctx, new); err != nil {
log.Error(err, "cannot update status")
return err
}

from := obj.DeepCopyObject().(client.Object)
reflect.ValueOf(from).Elem().FieldByName("Status").SetZero()
if err := s.client.Status().Patch(s.ctx, obj, client.MergeFrom(from)); err != nil {
log.Error(err, "cannot update status")
return err
}
log.V(1).Info("status updated")
return nil
}

func (s *svcStatusUpdater) Start(ctx context.Context) error {
s.ctx = ctx
s.isleader = true
s.queue.RunWithContext(ctx)
s.isleader = false
// s.ctx wasn't cleaned up here so lazy notifications
// doesn't crashloop due to nil ctx.
log.V(1).Info("status updated")
return nil
}

func (s *svcStatusUpdater) CanShutdown() bool {
return s.queue.Len() == 0
}
Loading

0 comments on commit 489151b

Please sign in to comment.