Skip to content

Commit

Permalink
BUG: this commit fixes the two following related issues:
Browse files Browse the repository at this point in the history
- ip addresses of ingresses updated too slowly with publish service ip address.
- creation of ingress delayed by unnecessary repeating operations or synchronous operations on previous point.
  • Loading branch information
ivanmatmati committed May 31, 2022
1 parent 2865276 commit ea7b243
Show file tree
Hide file tree
Showing 13 changed files with 414 additions and 272 deletions.
21 changes: 10 additions & 11 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ func (c *HAProxyController) Start() {
logger.Printf("Running on Kubernetes version: %s %s", k8sVersion.String(), k8sVersion.Platform)
}

c.Store.UpdateStatusFunc = ingress.NewStatusIngressUpdater(c.k8s.API, c.Store, c.OSArgs.IngressClass, c.OSArgs.EmptyIngressClass)

// Monitor k8s events
var chanSize int64 = int64(watch.DefaultChanSize * 6)
if c.OSArgs.ChannelSize > 0 {
Expand All @@ -124,11 +126,6 @@ func (c *HAProxyController) Start() {
logger.Infof("Channel size: %d", chanSize)
c.eventChan = make(chan SyncDataEvent, chanSize)
go c.monitorChanges()
if c.PublishService != nil {
// Update Ingress status
c.ingressChan = make(chan ingress.Sync, chanSize)
go ingress.UpdateStatus(c.k8s.API, c.Store, c.OSArgs.IngressClass, c.OSArgs.EmptyIngressClass, c.ingressChan)
}
}

// Stop handles shutting down HAProxyController
Expand All @@ -141,6 +138,7 @@ func (c *HAProxyController) Stop() {
func (c *HAProxyController) updateHAProxy() {
var reload bool
var err error

logger.Trace("HAProxy config sync started")

err = c.Client.APIStartTransaction()
Expand All @@ -160,26 +158,27 @@ func (c *HAProxyController) updateHAProxy() {
logger.Error(route.CustomRoutesReset(c.Client))
}

ingresses := []*ingress.Ingress{}
for _, namespace := range c.Store.Namespaces {
if !namespace.Relevant {
continue
}
c.Store.SecretsProcessed = map[string]struct{}{}
for _, ingResource := range namespace.Ingresses {
i := ingress.New(c.Store, ingResource, c.OSArgs.IngressClass, c.OSArgs.EmptyIngressClass)
if i == nil {
logger.Debugf("ingress '%s/%s' ignored: no matching IngressClass", ingResource.Namespace, ingResource.Name)
continue
}
if c.PublishService != nil && ingResource.Status == ADDED {
select {
case c.ingressChan <- ingress.Sync{Ingress: ingResource}:
default:
logger.Errorf("Ingress %s/%s: unable to sync status: sync channel full", ingResource.Namespace, ingResource.Name)
}
if ingResource.Status == ADDED {
ingresses = append(ingresses, i)
}
c.reload = i.Update(c.Store, &c.Cfg, c.Client) || c.reload
}
}
if len(ingresses) > 0 {
go ingress.UpdatePublishService(ingresses, c.k8s.API, c.Store.PublishServiceAddresses)
}

for _, handler := range c.updateHandlers {
reload, err = handler.Update(c.Store, &c.Cfg, c.Client)
Expand Down
17 changes: 16 additions & 1 deletion controller/ingress/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,15 @@ func (i *Ingress) handlePath(k store.K8s, cfg *configuration.ControllerCfg, api
if err != nil {
return
}

// Backend
backendReload, err := svc.HandleBackend(api, k)
if err != nil {
return
}

backendName, _ := svc.GetBackendName()

// Route
var routeReload bool
ingRoute := route.Route{
Expand All @@ -119,7 +122,12 @@ func (i *Ingress) handlePath(k store.K8s, cfg *configuration.ControllerCfg, api
}
cfg.ActiveBackends[backendName] = struct{}{}
// Endpoints
endpointsReload := svc.HandleHAProxySrvs(api, k)
service := svc.GetResource()
var endpointsReload bool
if _, ok := k.ServiceProcessed[service.Namespace+"/"+service.Name]; !ok {
endpointsReload = svc.HandleHAProxySrvs(api, k)
k.ServiceProcessed[service.Namespace+"/"+service.Name] = struct{}{}
}
return backendReload || endpointsReload || routeReload, err
}

Expand Down Expand Up @@ -192,14 +200,19 @@ func (i *Ingress) Update(k store.K8s, cfg *configuration.ControllerCfg, api api.
logger.Infof("Setting http default backend to '%s'", backendName)
}
}

// Ingress secrets
logger.Tracef("Ingress '%s/%s': processing secrets...", i.resource.Namespace, i.resource.Name)
for _, tls := range i.resource.TLS {
if _, ok := k.SecretsProcessed[i.resource.Namespace+"/"+tls.SecretName]; ok {
continue
}
secret, secErr := k.GetSecret(i.resource.Namespace, tls.SecretName)
if secErr != nil {
logger.Warningf("Ingress '%s/%s': %s", i.resource.Namespace, i.resource.Name, secErr)
continue
}
k.SecretsProcessed[i.resource.Namespace+"/"+tls.SecretName] = struct{}{}
_, err := cfg.Certificates.HandleTLSSecret(secret, certs.FT_CERT)
logger.Error(err)
}
Expand All @@ -217,6 +230,7 @@ func (i *Ingress) Update(k store.K8s, cfg *configuration.ControllerCfg, api api.
cfg.SSLPassthrough = true
}
i.HandleAnnotations(k, cfg)

// Ingress rules
logger.Tracef("ingress '%s/%s': processing rules...", i.resource.Namespace, i.resource.Name)
for _, rule := range i.resource.Rules {
Expand All @@ -228,5 +242,6 @@ func (i *Ingress) Update(k store.K8s, cfg *configuration.ControllerCfg, api api.
}
}
}

return
}
82 changes: 19 additions & 63 deletions controller/ingress/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,79 +14,28 @@ import (
"k8s.io/client-go/kubernetes"

"github.com/haproxytech/kubernetes-ingress/controller/store"
"github.com/haproxytech/kubernetes-ingress/controller/utils"
)

func UpdateStatus(client *kubernetes.Clientset, k store.K8s, class string, emptyClass bool, channel chan Sync) {
var i *Ingress
addresses := []string{}
for sync := range channel {
// Published Service updated: Update all Ingresses
if sync.Service != nil && getServiceAddresses(sync.Service, &addresses) {
logger.Debug("Addresses of Ingress Controller service changed, status of all ingress resources are going to be updated")
for _, ns := range k.Namespaces {
if !ns.Relevant {
continue
}
for _, ingress := range k.Namespaces[ns.Name].Ingresses {
if i = New(k, ingress, class, emptyClass); i != nil {
logger.Error(i.updateStatus(client, addresses))
}
}
type UpdateStatus func(service store.Service, ingresses []*store.Ingress)

func NewStatusIngressUpdater(client *kubernetes.Clientset, k store.K8s, class string, emptyClass bool) UpdateStatus {
return func(service store.Service, ingresses []*store.Ingress) {
for _, ingress := range ingresses {
if ing := New(k, ingress, class, emptyClass); ing != nil {
logger.Error(ing.UpdateStatus(client, service.Addresses))
}
} else if i = New(k, sync.Ingress, class, emptyClass); i != nil {
// Update single Ingress
logger.Error(i.updateStatus(client, addresses))
}
}
}

func getServiceAddresses(service *corev1.Service, curAddr *[]string) (updated bool) {
addresses := []string{}
switch service.Spec.Type {
case corev1.ServiceTypeExternalName:
addresses = []string{service.Spec.ExternalName}
case corev1.ServiceTypeClusterIP:
addresses = []string{service.Spec.ClusterIP}
case corev1.ServiceTypeNodePort:
if service.Spec.ExternalIPs != nil {
addresses = append(addresses, service.Spec.ExternalIPs...)
} else {
addresses = append(addresses, service.Spec.ClusterIP)
}
case corev1.ServiceTypeLoadBalancer:
for _, ip := range service.Status.LoadBalancer.Ingress {
if ip.IP == "" {
addresses = append(addresses, ip.Hostname)
} else {
addresses = append(addresses, ip.IP)
}
}
addresses = append(addresses, service.Spec.ExternalIPs...)
default:
logger.Errorf("Unable to extract IP address/es from service %s/%s", service.Namespace, service.Name)
return
}
func (i *Ingress) UpdateStatus(client *kubernetes.Clientset, addresses []string) (err error) {
var lbi []corev1.LoadBalancerIngress

if len(*curAddr) != len(addresses) {
updated = true
*curAddr = addresses
if utils.EqualSliceStringsWithoutOrder(i.resource.Addresses, addresses) {
return
}
for i, address := range addresses {
if address != (*curAddr)[i] {
updated = true
break
}
}
if updated {
*curAddr = addresses
}
return
}

func (i *Ingress) updateStatus(client *kubernetes.Clientset, addresses []string) (err error) {
logger.Tracef("Updating status of Ingress %s/%s", i.resource.Namespace, i.resource.Name)
var lbi []corev1.LoadBalancerIngress
for _, addr := range addresses {
if net.ParseIP(addr) == nil {
lbi = append(lbi, corev1.LoadBalancerIngress{Hostname: addr})
Expand Down Expand Up @@ -134,6 +83,13 @@ func (i *Ingress) updateStatus(client *kubernetes.Clientset, addresses []string)
return fmt.Errorf("failed to update LoadBalancer status of ingress %s/%s: %w", i.resource.Namespace, i.resource.Name, err)
}
logger.Tracef("Successful update of LoadBalancer status in ingress %s/%s", i.resource.Namespace, i.resource.Name)

// Allow to store the publish service addresses affected to the ingress for future comparison in update test.
i.resource.Addresses = addresses
return nil
}

func UpdatePublishService(ingresses []*Ingress, api *kubernetes.Clientset, publishServiceAddresses []string) {
for _, i := range ingresses {
logger.Error(i.UpdateStatus(api, publishServiceAddresses))
}
}
58 changes: 49 additions & 9 deletions controller/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,8 +474,11 @@ func (k *K8s) EventsServices(channel chan SyncDataEvent, ingressChan chan ingres
}
k.Logger.Tracef("%s %s: %s", SERVICE, item.Status, item.Name)
channel <- SyncDataEvent{SyncType: SERVICE, Namespace: item.Namespace, Data: item}
if publishSvc != nil && publishSvc.Namespace == data.Namespace && publishSvc.Name == data.Name {
ingressChan <- ingress.Sync{Service: data}
if publishSvc != nil && publishSvc.Namespace == item.Namespace && publishSvc.Name == item.Name {
// item copy because of ADDED handler in events.go which must modify the STATUS based solely on addresses
itemCopy := *item
itemCopy.Addresses = getServiceAddresses(data)
channel <- SyncDataEvent{SyncType: PUBLISH_SERVICE, Namespace: item.Namespace, Data: &itemCopy}
}
},
DeleteFunc: func(obj interface{}) {
Expand All @@ -499,8 +502,9 @@ func (k *K8s) EventsServices(channel chan SyncDataEvent, ingressChan chan ingres
}
k.Logger.Tracef("%s %s: %s", SERVICE, item.Status, item.Name)
channel <- SyncDataEvent{SyncType: SERVICE, Namespace: item.Namespace, Data: item}
if publishSvc != nil && publishSvc.Namespace == data.Namespace && publishSvc.Name == data.Name {
ingressChan <- ingress.Sync{Service: data}
if publishSvc != nil && publishSvc.Namespace == item.Namespace && publishSvc.Name == item.Name {
item.Addresses = getServiceAddresses(data)
channel <- SyncDataEvent{SyncType: PUBLISH_SERVICE, Namespace: data.Namespace, Data: item}
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
Expand All @@ -522,9 +526,7 @@ func (k *K8s) EventsServices(channel chan SyncDataEvent, ingressChan chan ingres
k.Logger.Tracef("forwarding to ExternalName Services for %v is disabled", data2)
return
}
if publishSvc != nil && publishSvc.Namespace == data2.Namespace && publishSvc.Name == data2.Name {
ingressChan <- ingress.Sync{Service: data2}
}

status := MODIFIED
item1 := &store.Service{
Namespace: data1.GetNamespace(),
Expand Down Expand Up @@ -564,8 +566,15 @@ func (k *K8s) EventsServices(channel chan SyncDataEvent, ingressChan chan ingres
if item2.Equal(item1) {
return
}
k.Logger.Tracef("%s %s: %s", SERVICE, item2.Status, item2.Name)
channel <- SyncDataEvent{SyncType: SERVICE, Namespace: item2.Namespace, Data: item2}

if !item2.Equal(item1) {
k.Logger.Tracef("%s %s: %s", SERVICE, item2.Status, item2.Name)
channel <- SyncDataEvent{SyncType: SERVICE, Namespace: item2.Namespace, Data: item2}
}
if publishSvc != nil && publishSvc.Namespace == item2.Namespace && publishSvc.Name == item2.Name {
item2.Addresses = getServiceAddresses(data2)
channel <- SyncDataEvent{SyncType: PUBLISH_SERVICE, Namespace: item2.Namespace, Data: item2}
}
},
})
go informer.Run(stop)
Expand Down Expand Up @@ -766,3 +775,34 @@ func (k *K8s) IsNetworkingV1ApiSupported() bool {

return major == 1 && minor >= 19
}

func getServiceAddresses(service *corev1.Service) (addresses []string) {
switch service.Spec.Type {
case corev1.ServiceTypeExternalName:
addresses = []string{service.Spec.ExternalName}
case corev1.ServiceTypeClusterIP:
addresses = []string{service.Spec.ClusterIP}
case corev1.ServiceTypeNodePort:
if service.Spec.ExternalIPs != nil {
addresses = append(addresses, service.Spec.ExternalIPs...)
} else {
addresses = append(addresses, service.Spec.ClusterIP)
}
case corev1.ServiceTypeLoadBalancer:
for _, ip := range service.Status.LoadBalancer.Ingress {
if ip.IP == "" {
addresses = append(addresses, ip.Hostname)
} else {
addresses = append(addresses, ip.IP)
}
}
addresses = append(addresses, service.Spec.ExternalIPs...)
default:
logger.Errorf("Unable to extract IP address/es from service %s/%s", service.Namespace, service.Name)
return
}
if addresses == nil {
addresses = []string{}
}
return
}
2 changes: 2 additions & 0 deletions controller/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ func (c *HAProxyController) SyncData() {
change = c.Store.EventSecret(ns, job.Data.(*store.Secret))
case POD:
change = c.Store.EventPod(job.Data.(store.PodEvent))
case PUBLISH_SERVICE:
change = c.Store.EventPublishService(ns, job.Data.(*store.Service))
}
hadChanges = hadChanges || change
}
Expand Down
3 changes: 3 additions & 0 deletions controller/service/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import (
// HandleHAProxySrvs handles the haproxy backend servers of the corresponding IngressPath (service + port)
func (s *Service) HandleHAProxySrvs(client api.HAProxyClient, store store.K8s) (reload bool) {
var srvsScaled bool

backend, err := s.getRuntimeBackend(store)

if err != nil {
logger.Warningf("Ingress '%s/%s': %s", s.resource.Namespace, s.resource.Name, err)
if servers, _ := client.BackendServersGet(s.backend.Name); servers != nil {
Expand All @@ -47,6 +49,7 @@ func (s *Service) HandleHAProxySrvs(client api.HAProxyClient, store store.K8s) (
s.updateHAProxySrv(client, *srvSlot, backend.Endpoints.Port)
}
}

if backend.DynUpdateFailed {
backend.DynUpdateFailed = false
return true
Expand Down
Loading

0 comments on commit ea7b243

Please sign in to comment.