Skip to content

Commit

Permalink
Merge pull request #187 from nicksardo/cherrypick-mci-fix
Browse files Browse the repository at this point in the history
Cherry-pick checkpoint changes to 1.0
  • Loading branch information
bowei authored Apr 3, 2018
2 parents 33b231c + 5a7a717 commit d9761c0
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 106 deletions.
77 changes: 34 additions & 43 deletions pkg/controller/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (c *ClusterManager) shutdown() error {
return err
}
if err := c.firewallPool.Shutdown(); err != nil {
if _, ok := err.(*firewalls.FirewallSyncError); ok {
if _, ok := err.(*firewalls.FirewallXPNError); ok {
return nil
}
return err
Expand All @@ -93,61 +93,52 @@ func (c *ClusterManager) shutdown() error {
return c.backendPool.Shutdown()
}

// Checkpoint performs a checkpoint with the cloud.
// - lbs are the single cluster L7 loadbalancers we wish to exist. If they already
// EnsureLoadBalancer creates the backend services and higher-level LB resources.
// - lb is the single cluster L7 loadbalancers we wish to exist. If they already
// exist, they should not have any broken links between say, a UrlMap and
// TargetHttpProxy.
// - nodeNames are the names of nodes we wish to add to all loadbalancer
// instance groups.
// - backendServicePorts are the ports for which we require BackendServices.
// - namedPorts are the ports which must be opened on instance groups.
// - firewallPorts are the ports which must be opened in the firewall rule.
// Returns the list of all instance groups corresponding to the given loadbalancers.
// If in performing the checkpoint the cluster manager runs out of quota, a
// googleapi 403 is returned.
func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, backendServicePorts []backends.ServicePort, namedPorts []backends.ServicePort, endpointPorts []string) ([]*compute.InstanceGroup, error) {
glog.V(4).Infof("Checkpoint(%v lbs, %v nodeNames, %v backendServicePorts, %v namedPorts, %v endpointPorts)", len(lbs), len(nodeNames), len(backendServicePorts), len(namedPorts), len(endpointPorts))

if len(namedPorts) != 0 {
// Add the default backend node port to the list of named ports for instance groups.
namedPorts = append(namedPorts, c.defaultBackendNodePort)
}
// Multiple ingress paths can point to the same service (and hence nodePort)
// but each nodePort can only have one set of cloud resources behind it. So
// don't waste time double validating GCE BackendServices.
namedPorts = uniq(namedPorts)
backendServicePorts = uniq(backendServicePorts)
// Create Instance Groups.
igs, err := c.EnsureInstanceGroupsAndPorts(namedPorts)
if err != nil {
return igs, err
}
if err := c.backendPool.Ensure(backendServicePorts, igs); err != nil {
return igs, err
}
if err := c.instancePool.Sync(nodeNames); err != nil {
return igs, err
}
if err := c.l7Pool.Sync(lbs); err != nil {
return igs, err
}

if err := c.firewallPool.Sync(nodeNames, endpointPorts...); err != nil {
return igs, err
// - lbServicePorts are the ports for which we require Backend Services.
// - instanceGroups are the groups to be referenced by the Backend Services..
// If GCE runs out of quota, a googleapi 403 is returned.
func (c *ClusterManager) EnsureLoadBalancer(lb *loadbalancers.L7RuntimeInfo, lbServicePorts []backends.ServicePort, instanceGroups []*compute.InstanceGroup) error {
glog.V(4).Infof("EnsureLoadBalancer(%q lb, %v lbServicePorts, %v instanceGroups)", lb.String(), len(lbServicePorts), len(instanceGroups))
if err := c.backendPool.Ensure(uniq(lbServicePorts), instanceGroups); err != nil {
return err
}

return igs, nil
return c.l7Pool.Sync([]*loadbalancers.L7RuntimeInfo{lb})
}

func (c *ClusterManager) EnsureInstanceGroupsAndPorts(servicePorts []backends.ServicePort) ([]*compute.InstanceGroup, error) {
func (c *ClusterManager) EnsureInstanceGroupsAndPorts(nodeNames []string, servicePorts []backends.ServicePort) ([]*compute.InstanceGroup, error) {
if len(servicePorts) != 0 {
// Add the default backend node port to the list of named ports for instance groups.
servicePorts = append(servicePorts, c.defaultBackendNodePort)
}

// Convert to slice of NodePort int64s.
ports := []int64{}
for _, p := range servicePorts {
for _, p := range uniq(servicePorts) {
ports = append(ports, p.NodePort)
}

// Create instance groups and set named ports.
igs, err := instances.EnsureInstanceGroupsAndPorts(c.instancePool, c.ClusterNamer, ports)
if err != nil {
return nil, err
}

// Add/remove instances to the instance groups.
if err = c.instancePool.Sync(nodeNames); err != nil {
return nil, err
}

return igs, err
}

func (c *ClusterManager) EnsureFirewall(nodeNames []string, endpointPorts []string) error {
return c.firewallPool.Sync(nodeNames, endpointPorts...)
}

// GC garbage collects unused resources.
// - lbNames are the names of L7 loadbalancers we wish to exist. Those not in
// this list are removed from the cloud.
Expand Down
122 changes: 66 additions & 56 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ func (lbc *LoadBalancerController) Stop(deleteAll bool) error {
return nil
}

// sync manages Ingress create/updates/deletes.
func (lbc *LoadBalancerController) sync(key string) (err error) {
// sync manages Ingress create/updates/deletes
func (lbc *LoadBalancerController) sync(key string) (retErr error) {
if !lbc.hasSynced() {
time.Sleep(storeSyncPollPeriod)
return fmt.Errorf("waiting for stores to sync")
Expand All @@ -247,90 +247,96 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
return err
}

// allNodePorts contains ServicePorts used by all ingresses (single-cluster and multi-cluster).
allNodePorts := lbc.Translator.ToNodePorts(&allIngresses)
// gceNodePorts contains the ServicePorts used by only single-cluster ingress.
gceNodePorts := lbc.Translator.ToNodePorts(&gceIngresses)
nodeNames, err := getReadyNodeNames(listers.NewNodeLister(lbc.nodeLister))
if err != nil {
return err
}
lbNames := lbc.ingLister.Store.ListKeys()

lbNames := lbc.ingLister.Store.ListKeys()
obj, ingExists, err := lbc.ingLister.Store.GetByKey(key)
if err != nil {
return err
}

if !ingExists {
glog.V(2).Infof("Ingress %q no longer exists, triggering GC", key)
return lbc.CloudClusterManager.GC(lbNames, allNodePorts)
// GC will find GCE resources that were used for this ingress and delete them.
return lbc.CloudClusterManager.GC(lbNames, gceNodePorts)
}

// Get ingress and DeepCopy for assurance that we don't pollute other goroutines with changes.
ing, ok := obj.(*extensions.Ingress)
if !ok {
return fmt.Errorf("invalid object (not of type Ingress), type was %T", obj)
}
// DeepCopy for assurance that we don't pollute other goroutines with changes.
ing = ing.DeepCopy()

// This performs a 2 phase checkpoint with the cloud:
// * Phase 1 creates/verifies resources are as expected. At the end of a
// successful checkpoint we know that existing L7s are WAI, and the L7
// for the Ingress associated with "key" is ready for a UrlMap update.
// If this encounters an error, eg for quota reasons, we want to invoke
// Phase 2 right away and retry checkpointing.
// * Phase 2 performs GC by refcounting shared resources. This needs to
// happen periodically whether or not stage 1 fails. At the end of a
// successful GC we know that there are no dangling cloud resources that
// don't have an associated Kubernetes Ingress/Service/Endpoint.

var syncError error
defer func() {
if deferErr := lbc.CloudClusterManager.GC(lbNames, allNodePorts); deferErr != nil {
err = fmt.Errorf("error during sync %v, error during GC %v", syncError, deferErr)
if retErr != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", retErr.Error())
}
// Garbage collection will occur regardless of an error occurring. If an error occurred,
// it could have been caused by quota issues; therefore, garbage collecting now may
// free up enough quota for the next sync to pass.
if gcErr := lbc.CloudClusterManager.GC(lbNames, gceNodePorts); gcErr != nil {
retErr = fmt.Errorf("error during sync %v, error during GC %v", retErr, gcErr)
}
glog.V(3).Infof("Finished syncing %v", key)
}()

singleIngressList := &extensions.IngressList{
Items: []extensions.Ingress{*ing},
igs, err := lbc.CloudClusterManager.EnsureInstanceGroupsAndPorts(nodeNames, allNodePorts)
if err != nil {
return err
}

if isGCEMultiClusterIngress(ing) {
// Add instance group names as annotation on the ingress and return.
if ing.Annotations == nil {
ing.Annotations = map[string]string{}
}
if err = setInstanceGroupsAnnotation(ing.Annotations, igs); err != nil {
return err
}
if err = updateAnnotations(lbc.client, ing.Name, ing.Namespace, ing.Annotations); err != nil {
return err
}
glog.V(3).Infof("Finished syncing MCI-ingress %v", key)
return nil
}

// Continue syncing this specific GCE ingress.
lb, err := lbc.toRuntimeInfo(ing)
if err != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Ingress", err.Error())
return err
}
lbs := []*loadbalancers.L7RuntimeInfo{lb}

// Get all service ports for the ingress being synced.
ingSvcPorts := lbc.Translator.ToNodePorts(singleIngressList)
lbSvcPorts := lbc.Translator.ToNodePorts(&extensions.IngressList{
Items: []extensions.Ingress{*ing},
})

igs, err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, ingSvcPorts, allNodePorts, lbc.Translator.GatherEndpointPorts(gceNodePorts))
if err != nil {
const eventMsg = "GCE"
if fwErr, ok := err.(*firewalls.FirewallSyncError); ok {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeNormal, eventMsg, fwErr.Message)
} else {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, eventMsg, err.Error())
syncError = err
}
// Create the backend services and higher-level LB resources.
if err = lbc.CloudClusterManager.EnsureLoadBalancer(lb, lbSvcPorts, igs); err != nil {
return err
}

if isGCEMultiClusterIngress(ing) {
// Add instance group names as annotation on the ingress.
if ing.Annotations == nil {
ing.Annotations = map[string]string{}
}
if err = setInstanceGroupsAnnotation(ing.Annotations, igs); err != nil {
negEndpointPorts := lbc.Translator.GatherEndpointPorts(gceNodePorts)
// Ensure firewall rule for the cluster and pass any NEG endpoint ports.
if err = lbc.CloudClusterManager.EnsureFirewall(nodeNames, negEndpointPorts); err != nil {
if fwErr, ok := err.(*firewalls.FirewallXPNError); ok {
// XPN: Raise an event and ignore the error.
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeNormal, "XPN", fwErr.Message)
} else {
return err
}
return updateAnnotations(lbc.client, ing.Name, ing.Namespace, ing.Annotations)
}

// If NEG enabled, link the backend services to the NEGs.
if lbc.negEnabled {
svcPorts := lbc.Translator.ToNodePorts(singleIngressList)
for _, svcPort := range svcPorts {
for _, svcPort := range lbSvcPorts {
if svcPort.NEGEnabled {

zones, err := lbc.Translator.ListZones()
if err != nil {
return err
Expand All @@ -345,20 +351,24 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
// Update the UrlMap of the single loadbalancer that came through the watch.
l7, err := lbc.CloudClusterManager.l7Pool.Get(key)
if err != nil {
syncError = fmt.Errorf("%v, unable to get loadbalancer: %v", syncError, err)
return syncError
return fmt.Errorf("unable to get loadbalancer: %v", err)
}

urlMap, err := lbc.Translator.ToURLMap(ing)
if err != nil {
return fmt.Errorf("convert to URL Map error %v", err)
}

if urlMap, err := lbc.Translator.ToURLMap(ing); err != nil {
syncError = fmt.Errorf("%v, convert to url map error %v", syncError, err)
} else if err := l7.UpdateUrlMap(urlMap); err != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "UrlMap", err.Error())
syncError = fmt.Errorf("%v, update url map error: %v", syncError, err)
} else if err := lbc.updateIngressStatus(l7, ing); err != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Status", err.Error())
syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err)
if err := l7.UpdateUrlMap(urlMap); err != nil {
return fmt.Errorf("update URL Map error: %v", err)
}
return syncError

if err := lbc.updateIngressStatus(l7, ing); err != nil {
return fmt.Errorf("update ingress status error: %v", err)
}

glog.V(3).Infof("Finished syncing %v", key)
return nil
}

// updateIngressStatus updates the IP and annotations of a loadbalancer.
Expand Down
8 changes: 4 additions & 4 deletions pkg/firewalls/firewalls.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,19 +147,19 @@ func (fr *FirewallRules) deleteFirewall(name string) error {
return err
}

func newFirewallXPNError(internal error, cmd string) *FirewallSyncError {
return &FirewallSyncError{
func newFirewallXPNError(internal error, cmd string) *FirewallXPNError {
return &FirewallXPNError{
Internal: internal,
Message: fmt.Sprintf("Firewall change required by network admin: `%v`", cmd),
}
}

type FirewallSyncError struct {
type FirewallXPNError struct {
Internal error
Message string
}

func (f *FirewallSyncError) Error() string {
func (f *FirewallXPNError) Error() string {
return f.Message
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/firewalls/firewalls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestSyncXPNReadOnly(t *testing.T) {
nodes := []string{"node-a", "node-b", "node-c"}

err := fp.Sync(nodes)
if fwErr, ok := err.(*FirewallSyncError); !ok || !strings.Contains(fwErr.Message, "create") {
if fwErr, ok := err.(*FirewallXPNError); !ok || !strings.Contains(fwErr.Message, "create") {
t.Errorf("Expected firewall sync error with a user message. Received err: %v", err)
}

Expand Down Expand Up @@ -193,12 +193,12 @@ func TestSyncXPNReadOnly(t *testing.T) {

nodes = append(nodes, "node-d")
err = fp.Sync(nodes)
if fwErr, ok := err.(*FirewallSyncError); !ok || !strings.Contains(fwErr.Message, "update") {
if fwErr, ok := err.(*FirewallXPNError); !ok || !strings.Contains(fwErr.Message, "update") {
t.Errorf("Expected firewall sync error with a user message. Received err: %v", err)
}

err = fp.Shutdown()
if fwErr, ok := err.(*FirewallSyncError); !ok || !strings.Contains(fwErr.Message, "delete") {
if fwErr, ok := err.(*FirewallXPNError); !ok || !strings.Contains(fwErr.Message, "delete") {
t.Errorf("Expected firewall sync error with a user message. Received err: %v", err)
}
}
Expand Down

0 comments on commit d9761c0

Please sign in to comment.