diff --git a/pkg/controller/cluster_manager.go b/pkg/controller/cluster_manager.go index b9484fe252..39a79c28fd 100644 --- a/pkg/controller/cluster_manager.go +++ b/pkg/controller/cluster_manager.go @@ -84,7 +84,7 @@ func (c *ClusterManager) shutdown() error { return err } if err := c.firewallPool.Shutdown(); err != nil { - if _, ok := err.(*firewalls.FirewallSyncError); ok { + if _, ok := err.(*firewalls.FirewallXPNError); ok { return nil } return err @@ -93,61 +93,52 @@ func (c *ClusterManager) shutdown() error { return c.backendPool.Shutdown() } -// Checkpoint performs a checkpoint with the cloud. -// - lbs are the single cluster L7 loadbalancers we wish to exist. If they already +// EnsureLoadBalancer creates the backend services and higher-level LB resources. +// - lb is the single cluster L7 loadbalancers we wish to exist. If they already // exist, they should not have any broken links between say, a UrlMap and // TargetHttpProxy. -// - nodeNames are the names of nodes we wish to add to all loadbalancer -// instance groups. -// - backendServicePorts are the ports for which we require BackendServices. -// - namedPorts are the ports which must be opened on instance groups. -// - firewallPorts are the ports which must be opened in the firewall rule. -// Returns the list of all instance groups corresponding to the given loadbalancers. -// If in performing the checkpoint the cluster manager runs out of quota, a -// googleapi 403 is returned. -func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, backendServicePorts []backends.ServicePort, namedPorts []backends.ServicePort, endpointPorts []string) ([]*compute.InstanceGroup, error) { - glog.V(4).Infof("Checkpoint(%v lbs, %v nodeNames, %v backendServicePorts, %v namedPorts, %v endpointPorts)", len(lbs), len(nodeNames), len(backendServicePorts), len(namedPorts), len(endpointPorts)) - - if len(namedPorts) != 0 { - // Add the default backend node port to the list of named ports for instance groups. - namedPorts = append(namedPorts, c.defaultBackendNodePort) - } - // Multiple ingress paths can point to the same service (and hence nodePort) - // but each nodePort can only have one set of cloud resources behind it. So - // don't waste time double validating GCE BackendServices. - namedPorts = uniq(namedPorts) - backendServicePorts = uniq(backendServicePorts) - // Create Instance Groups. - igs, err := c.EnsureInstanceGroupsAndPorts(namedPorts) - if err != nil { - return igs, err - } - if err := c.backendPool.Ensure(backendServicePorts, igs); err != nil { - return igs, err - } - if err := c.instancePool.Sync(nodeNames); err != nil { - return igs, err - } - if err := c.l7Pool.Sync(lbs); err != nil { - return igs, err - } - - if err := c.firewallPool.Sync(nodeNames, endpointPorts...); err != nil { - return igs, err +// - lbServicePorts are the ports for which we require Backend Services. +// - instanceGroups are the groups to be referenced by the Backend Services.. +// If GCE runs out of quota, a googleapi 403 is returned. +func (c *ClusterManager) EnsureLoadBalancer(lb *loadbalancers.L7RuntimeInfo, lbServicePorts []backends.ServicePort, instanceGroups []*compute.InstanceGroup) error { + glog.V(4).Infof("EnsureLoadBalancer(%q lb, %v lbServicePorts, %v instanceGroups)", lb.String(), len(lbServicePorts), len(instanceGroups)) + if err := c.backendPool.Ensure(uniq(lbServicePorts), instanceGroups); err != nil { + return err } - return igs, nil + return c.l7Pool.Sync([]*loadbalancers.L7RuntimeInfo{lb}) } -func (c *ClusterManager) EnsureInstanceGroupsAndPorts(servicePorts []backends.ServicePort) ([]*compute.InstanceGroup, error) { +func (c *ClusterManager) EnsureInstanceGroupsAndPorts(nodeNames []string, servicePorts []backends.ServicePort) ([]*compute.InstanceGroup, error) { + if len(servicePorts) != 0 { + // Add the default backend node port to the list of named ports for instance groups. + servicePorts = append(servicePorts, c.defaultBackendNodePort) + } + + // Convert to slice of NodePort int64s. ports := []int64{} - for _, p := range servicePorts { + for _, p := range uniq(servicePorts) { ports = append(ports, p.NodePort) } + + // Create instance groups and set named ports. igs, err := instances.EnsureInstanceGroupsAndPorts(c.instancePool, c.ClusterNamer, ports) + if err != nil { + return nil, err + } + + // Add/remove instances to the instance groups. + if err = c.instancePool.Sync(nodeNames); err != nil { + return nil, err + } + return igs, err } +func (c *ClusterManager) EnsureFirewall(nodeNames []string, endpointPorts []string) error { + return c.firewallPool.Sync(nodeNames, endpointPorts...) +} + // GC garbage collects unused resources. // - lbNames are the names of L7 loadbalancers we wish to exist. Those not in // this list are removed from the cloud. diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 7d60ec8694..a4194ebd9c 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -230,8 +230,8 @@ func (lbc *LoadBalancerController) Stop(deleteAll bool) error { return nil } -// sync manages Ingress create/updates/deletes. -func (lbc *LoadBalancerController) sync(key string) (err error) { +// sync manages Ingress create/updates/deletes +func (lbc *LoadBalancerController) sync(key string) (retErr error) { if !lbc.hasSynced() { time.Sleep(storeSyncPollPeriod) return fmt.Errorf("waiting for stores to sync") @@ -247,90 +247,96 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { return err } + // allNodePorts contains ServicePorts used by all ingresses (single-cluster and multi-cluster). allNodePorts := lbc.Translator.ToNodePorts(&allIngresses) + // gceNodePorts contains the ServicePorts used by only single-cluster ingress. gceNodePorts := lbc.Translator.ToNodePorts(&gceIngresses) nodeNames, err := getReadyNodeNames(listers.NewNodeLister(lbc.nodeLister)) if err != nil { return err } - lbNames := lbc.ingLister.Store.ListKeys() + lbNames := lbc.ingLister.Store.ListKeys() obj, ingExists, err := lbc.ingLister.Store.GetByKey(key) if err != nil { return err } - if !ingExists { glog.V(2).Infof("Ingress %q no longer exists, triggering GC", key) - return lbc.CloudClusterManager.GC(lbNames, allNodePorts) + // GC will find GCE resources that were used for this ingress and delete them. + return lbc.CloudClusterManager.GC(lbNames, gceNodePorts) } + // Get ingress and DeepCopy for assurance that we don't pollute other goroutines with changes. ing, ok := obj.(*extensions.Ingress) if !ok { return fmt.Errorf("invalid object (not of type Ingress), type was %T", obj) } - // DeepCopy for assurance that we don't pollute other goroutines with changes. ing = ing.DeepCopy() - // This performs a 2 phase checkpoint with the cloud: - // * Phase 1 creates/verifies resources are as expected. At the end of a - // successful checkpoint we know that existing L7s are WAI, and the L7 - // for the Ingress associated with "key" is ready for a UrlMap update. - // If this encounters an error, eg for quota reasons, we want to invoke - // Phase 2 right away and retry checkpointing. - // * Phase 2 performs GC by refcounting shared resources. This needs to - // happen periodically whether or not stage 1 fails. At the end of a - // successful GC we know that there are no dangling cloud resources that - // don't have an associated Kubernetes Ingress/Service/Endpoint. - - var syncError error defer func() { - if deferErr := lbc.CloudClusterManager.GC(lbNames, allNodePorts); deferErr != nil { - err = fmt.Errorf("error during sync %v, error during GC %v", syncError, deferErr) + if retErr != nil { + lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", retErr.Error()) + } + // Garbage collection will occur regardless of an error occurring. If an error occurred, + // it could have been caused by quota issues; therefore, garbage collecting now may + // free up enough quota for the next sync to pass. + if gcErr := lbc.CloudClusterManager.GC(lbNames, gceNodePorts); gcErr != nil { + retErr = fmt.Errorf("error during sync %v, error during GC %v", retErr, gcErr) } - glog.V(3).Infof("Finished syncing %v", key) }() - singleIngressList := &extensions.IngressList{ - Items: []extensions.Ingress{*ing}, + igs, err := lbc.CloudClusterManager.EnsureInstanceGroupsAndPorts(nodeNames, allNodePorts) + if err != nil { + return err + } + + if isGCEMultiClusterIngress(ing) { + // Add instance group names as annotation on the ingress and return. + if ing.Annotations == nil { + ing.Annotations = map[string]string{} + } + if err = setInstanceGroupsAnnotation(ing.Annotations, igs); err != nil { + return err + } + if err = updateAnnotations(lbc.client, ing.Name, ing.Namespace, ing.Annotations); err != nil { + return err + } + glog.V(3).Infof("Finished syncing MCI-ingress %v", key) + return nil } + + // Continue syncing this specific GCE ingress. lb, err := lbc.toRuntimeInfo(ing) if err != nil { - lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Ingress", err.Error()) return err } - lbs := []*loadbalancers.L7RuntimeInfo{lb} // Get all service ports for the ingress being synced. - ingSvcPorts := lbc.Translator.ToNodePorts(singleIngressList) + lbSvcPorts := lbc.Translator.ToNodePorts(&extensions.IngressList{ + Items: []extensions.Ingress{*ing}, + }) - igs, err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, ingSvcPorts, allNodePorts, lbc.Translator.GatherEndpointPorts(gceNodePorts)) - if err != nil { - const eventMsg = "GCE" - if fwErr, ok := err.(*firewalls.FirewallSyncError); ok { - lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeNormal, eventMsg, fwErr.Message) - } else { - lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, eventMsg, err.Error()) - syncError = err - } + // Create the backend services and higher-level LB resources. + if err = lbc.CloudClusterManager.EnsureLoadBalancer(lb, lbSvcPorts, igs); err != nil { + return err } - if isGCEMultiClusterIngress(ing) { - // Add instance group names as annotation on the ingress. - if ing.Annotations == nil { - ing.Annotations = map[string]string{} - } - if err = setInstanceGroupsAnnotation(ing.Annotations, igs); err != nil { + negEndpointPorts := lbc.Translator.GatherEndpointPorts(gceNodePorts) + // Ensure firewall rule for the cluster and pass any NEG endpoint ports. + if err = lbc.CloudClusterManager.EnsureFirewall(nodeNames, negEndpointPorts); err != nil { + if fwErr, ok := err.(*firewalls.FirewallXPNError); ok { + // XPN: Raise an event and ignore the error. + lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeNormal, "XPN", fwErr.Message) + } else { return err } - return updateAnnotations(lbc.client, ing.Name, ing.Namespace, ing.Annotations) } + // If NEG enabled, link the backend services to the NEGs. if lbc.negEnabled { - svcPorts := lbc.Translator.ToNodePorts(singleIngressList) - for _, svcPort := range svcPorts { + for _, svcPort := range lbSvcPorts { if svcPort.NEGEnabled { - zones, err := lbc.Translator.ListZones() if err != nil { return err @@ -345,20 +351,24 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { // Update the UrlMap of the single loadbalancer that came through the watch. l7, err := lbc.CloudClusterManager.l7Pool.Get(key) if err != nil { - syncError = fmt.Errorf("%v, unable to get loadbalancer: %v", syncError, err) - return syncError + return fmt.Errorf("unable to get loadbalancer: %v", err) + } + + urlMap, err := lbc.Translator.ToURLMap(ing) + if err != nil { + return fmt.Errorf("convert to URL Map error %v", err) } - if urlMap, err := lbc.Translator.ToURLMap(ing); err != nil { - syncError = fmt.Errorf("%v, convert to url map error %v", syncError, err) - } else if err := l7.UpdateUrlMap(urlMap); err != nil { - lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "UrlMap", err.Error()) - syncError = fmt.Errorf("%v, update url map error: %v", syncError, err) - } else if err := lbc.updateIngressStatus(l7, ing); err != nil { - lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Status", err.Error()) - syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err) + if err := l7.UpdateUrlMap(urlMap); err != nil { + return fmt.Errorf("update URL Map error: %v", err) } - return syncError + + if err := lbc.updateIngressStatus(l7, ing); err != nil { + return fmt.Errorf("update ingress status error: %v", err) + } + + glog.V(3).Infof("Finished syncing %v", key) + return nil } // updateIngressStatus updates the IP and annotations of a loadbalancer. diff --git a/pkg/firewalls/firewalls.go b/pkg/firewalls/firewalls.go index 704a8413c9..a0abdc59c9 100644 --- a/pkg/firewalls/firewalls.go +++ b/pkg/firewalls/firewalls.go @@ -147,19 +147,19 @@ func (fr *FirewallRules) deleteFirewall(name string) error { return err } -func newFirewallXPNError(internal error, cmd string) *FirewallSyncError { - return &FirewallSyncError{ +func newFirewallXPNError(internal error, cmd string) *FirewallXPNError { + return &FirewallXPNError{ Internal: internal, Message: fmt.Sprintf("Firewall change required by network admin: `%v`", cmd), } } -type FirewallSyncError struct { +type FirewallXPNError struct { Internal error Message string } -func (f *FirewallSyncError) Error() string { +func (f *FirewallXPNError) Error() string { return f.Message } diff --git a/pkg/firewalls/firewalls_test.go b/pkg/firewalls/firewalls_test.go index 80cbf4054e..5a794ea152 100644 --- a/pkg/firewalls/firewalls_test.go +++ b/pkg/firewalls/firewalls_test.go @@ -165,7 +165,7 @@ func TestSyncXPNReadOnly(t *testing.T) { nodes := []string{"node-a", "node-b", "node-c"} err := fp.Sync(nodes) - if fwErr, ok := err.(*FirewallSyncError); !ok || !strings.Contains(fwErr.Message, "create") { + if fwErr, ok := err.(*FirewallXPNError); !ok || !strings.Contains(fwErr.Message, "create") { t.Errorf("Expected firewall sync error with a user message. Received err: %v", err) } @@ -193,12 +193,12 @@ func TestSyncXPNReadOnly(t *testing.T) { nodes = append(nodes, "node-d") err = fp.Sync(nodes) - if fwErr, ok := err.(*FirewallSyncError); !ok || !strings.Contains(fwErr.Message, "update") { + if fwErr, ok := err.(*FirewallXPNError); !ok || !strings.Contains(fwErr.Message, "update") { t.Errorf("Expected firewall sync error with a user message. Received err: %v", err) } err = fp.Shutdown() - if fwErr, ok := err.(*FirewallSyncError); !ok || !strings.Contains(fwErr.Message, "delete") { + if fwErr, ok := err.(*FirewallXPNError); !ok || !strings.Contains(fwErr.Message, "delete") { t.Errorf("Expected firewall sync error with a user message. Received err: %v", err) } }