From d0d8c133c209146fd7794091897a95f834cdb04b Mon Sep 17 00:00:00 2001 From: Rohit Ramkumar Date: Mon, 12 Mar 2018 17:38:25 -0700 Subject: [PATCH 1/4] Initial ingress rate limiting implementation --- cmd/glbc/app/clients.go | 7 + pkg/flags/flags.go | 26 ++++ pkg/ratelimit/ratelimit.go | 142 ++++++++++++++++++ pkg/ratelimit/ratelimit_test.go | 55 +++++++ .../pkg/cloudprovider/providers/gce/gce.go | 17 ++- 5 files changed, 245 insertions(+), 2 deletions(-) create mode 100644 pkg/ratelimit/ratelimit.go create mode 100644 pkg/ratelimit/ratelimit_test.go diff --git a/cmd/glbc/app/clients.go b/cmd/glbc/app/clients.go index db4be77df5..88e5b56b7f 100644 --- a/cmd/glbc/app/clients.go +++ b/cmd/glbc/app/clients.go @@ -36,6 +36,7 @@ import ( _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "k8s.io/ingress-gce/pkg/flags" + "k8s.io/ingress-gce/pkg/ratelimit" "k8s.io/ingress-gce/pkg/utils" ) @@ -95,6 +96,12 @@ func NewGCEClient() *gce.GCECloud { provider, err := cloudprovider.GetCloudProvider("gce", configReader()) if err == nil { cloud := provider.(*gce.GCECloud) + // Configure GCE rate limiting + rl, err := ratelimit.NewGCERateLimiter(flags.F.GCERateLimit.Values()) + if err != nil { + glog.Errorf("Error in configuring rate limiting: %v", err) + } + cloud.SetRateLimiter(rl) // If this controller is scheduled on a node without compute/rw // it won't be allowed to list backends. We can assume that the // user has no need for Ingress in this case. If they grant diff --git a/pkg/flags/flags.go b/pkg/flags/flags.go index 2e5abf97f0..9e72b746e8 100644 --- a/pkg/flags/flags.go +++ b/pkg/flags/flags.go @@ -43,6 +43,7 @@ var ( ConfigFilePath string DefaultSvc string DeleteAllOnQuit bool + GCERateLimit RateLimitSpecs HealthCheckPath string HealthzPort int InCluster bool @@ -58,6 +59,7 @@ var ( func init() { F.NodePortRanges.ports = []string{DefaultNodePortRange} + F.GCERateLimit.specs = []string{} } // Register flags with the command line parser. @@ -85,6 +87,11 @@ the default backend.`) external cloud resources as it's shutting down. Mostly used for testing. In normal environments the controller should only delete a loadbalancer if the associated Ingress is deleted.`) + flag.Var(&F.GCERateLimit, "gce-ratelimit", + `Optional, can be used to rate limit certain GCE API calls. Example usage: +--gce-ratelimit=ga.Addresses.Get,qps,1.5,5 +(limit ga.Addresses.Get to maximum of 1.5 qps with a burst of 5). +Use the flag more than once to rate limit more than one call.`) flag.StringVar(&F.HealthCheckPath, "health-check-path", "/", `Path used to health-check a backend service. All Services must serve a 200 page on this path. Currently this is only configurable globally.`) @@ -113,6 +120,25 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5 `This flag has been deprecated and no longer has any effect.`) } +type RateLimitSpecs struct { + specs []string +} + +// Part of the flag.Value interface. +func (r *RateLimitSpecs) String() string { + return strings.Join(r.specs, ";") +} + +// Set supports the flag being repeated multiple times. Part of the flag.Value interface. +func (r *RateLimitSpecs) Set(value string) error { + r.specs = append(r.specs, value) + return nil +} + +func (r *RateLimitSpecs) Values() []string { + return r.specs +} + type PortRanges struct { ports []string isSet bool diff --git a/pkg/ratelimit/ratelimit.go b/pkg/ratelimit/ratelimit.go new file mode 100644 index 0000000000..b96a12f941 --- /dev/null +++ b/pkg/ratelimit/ratelimit.go @@ -0,0 +1,142 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ratelimit + +import ( + "context" + "fmt" + "strconv" + "strings" + + "github.com/golang/glog" + "k8s.io/client-go/util/flowcontrol" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta" +) + +// GCERateLimiter implements cloud.RateLimiter +type GCERateLimiter struct { + // Map a RateLimitKey to its rate limiter implementation. + rateLimitImpls map[*cloud.RateLimitKey]flowcontrol.RateLimiter +} + +// NewGCERateLimiter parses the list of rate limiting specs passed in and +// returns a properly configured cloud.RateLimiter implementation. +// Expected format of specs: {"[version].[service].[operation],[type],[param1],[param2],..", "..."} +func NewGCERateLimiter(specs []string) (*GCERateLimiter, error) { + rateLimitImpls := make(map[*cloud.RateLimitKey]flowcontrol.RateLimiter) + // Within each specification, split on comma to get the operation, + // rate limiter type, and extra parameters. + for _, spec := range specs { + params := strings.Split(spec, ",") + if len(params) < 2 { + return nil, fmt.Errorf("Must at least specify operation and rate limiter type.") + } + // params[0] should consist of the operation to rate limit. + key, err := constructRateLimitKey(params[0]) + if err != nil { + return nil, err + } + // params[1:] should consist of the rate limiter type and extra params. + impl, err := constructRateLimitImpl(params[1:]) + if err != nil { + return nil, err + } + rateLimitImpls[key] = impl + glog.Infof("Configured rate limiting for: %v", key) + } + if len(rateLimitImpls) == 0 { + return nil, nil + } + return &GCERateLimiter{rateLimitImpls}, nil +} + +// Implementation of cloud.RateLimiter +func (l *GCERateLimiter) Accept(ctx context.Context, key *cloud.RateLimitKey) error { + ch := make(chan struct{}) + go func() { + // Call flowcontrol.RateLimiter implementation. + impl := l.rateLimitImpl(key) + if impl != nil { + impl.Accept() + } + close(ch) + }() + select { + case <-ch: + break + case <-ctx.Done(): + return ctx.Err() + } + return nil +} + +// rateLimitImpl returns the flowcontrol.RateLimiter implementation +// associated with the passed in key. +func (l *GCERateLimiter) rateLimitImpl(key *cloud.RateLimitKey) flowcontrol.RateLimiter { + // Since the passed in key will have the ProjectID field filled in, we need to + // create a copy which does not, so that retreiving the rate limiter implementation + // through the map works as expected. + keyCopy := &cloud.RateLimitKey{ + ProjectID: "", + Operation: key.Operation, + Version: key.Version, + Service: key.Service, + } + return l.rateLimitImpls[keyCopy] +} + +// Expected format of param is [version].[service].[operation] +func constructRateLimitKey(param string) (*cloud.RateLimitKey, error) { + params := strings.Split(param, ".") + if len(params) != 3 { + return nil, fmt.Errorf("Must specify operation in [version].[service].[operation] format.") + } + // TODO(rramkumar): Add another layer of validation here? + version := meta.Version(params[0]) + service := params[1] + operation := params[2] + return &cloud.RateLimitKey{ + ProjectID: "", + Operation: operation, + Version: version, + Service: service, + }, nil +} + +// constructRateLimitImpl parses the slice and returns a flowcontrol.RateLimiter +// Expected format is [type],[param1],[param2],... +func constructRateLimitImpl(params []string) (flowcontrol.RateLimiter, error) { + // For now, only the "qps" type is supported. + rlType := params[0] + implArgs := params[1:] + if rlType == "qps" { + if len(implArgs) != 2 { + return nil, fmt.Errorf("Invalid number of args for rate limiter type %v. Expected %d, Got %v", rlType, 2, len(implArgs)) + } + qps, err := strconv.ParseFloat(implArgs[0], 32) + if err != nil || qps <= 0 { + return nil, fmt.Errorf("Invalid argument for rate limiter type %v. Either %v is not a float or not greater than 0.", rlType, implArgs[0]) + } + burst, err := strconv.Atoi(implArgs[1]) + if err != nil { + return nil, fmt.Errorf("Invalid argument for rate limiter type %v. Expected %v to be a int.", rlType, implArgs[1]) + } + return flowcontrol.NewTokenBucketRateLimiter(float32(qps), burst), nil + } + return nil, fmt.Errorf("Invalid rate limiter type provided: %v", rlType) +} diff --git a/pkg/ratelimit/ratelimit_test.go b/pkg/ratelimit/ratelimit_test.go new file mode 100644 index 0000000000..1aa9114b9a --- /dev/null +++ b/pkg/ratelimit/ratelimit_test.go @@ -0,0 +1,55 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ratelimit + +import ( + "testing" +) + +func TestConfigureGCERateLimiting(t *testing.T) { + validTestCases := [][]string{ + []string{"ga.Addresses.Get,qps,1.5,5"}, + []string{"ga.Addresses.List,qps,2,10"}, + []string{"ga.Addresses.Get,qps,1.5,5", "ga.Firewalls.Get,qps,1.5,5"}, + } + invalidTestCases := [][]string{ + []string{"gaAddresses.Get,qps,1.5,5"}, + []string{"gaAddresses.Get,qps,0,5"}, + []string{"gaAddresses.Get,qps,-1,5"}, + []string{"ga.Addresses.Get,qps,1.5.5"}, + []string{"gaAddresses.Get,qps,1.5,5.5"}, + []string{"gaAddressesGet,qps,1.5,5.5"}, + []string{"gaAddressesGet,qps,1.5"}, + []string{"ga.Addresses.Get,foo,1.5,5"}, + []string{"ga.Addresses.Get,1.5,5"}, + []string{"ga.Addresses.Get,qps,1.5,5", "gaFirewalls.Get,qps,1.5,5"}, + } + + for _, testCase := range validTestCases { + _, err := NewGCERateLimiter(testCase) + if err != nil { + t.Errorf("Did not expect an error for test case: %v", testCase) + } + } + + for _, testCase := range invalidTestCases { + _, err := NewGCERateLimiter(testCase) + if err == nil { + t.Errorf("Expected an error for test case: %v", testCase) + } + } +} diff --git a/vendor/k8s.io/kubernetes/pkg/cloudprovider/providers/gce/gce.go b/vendor/k8s.io/kubernetes/pkg/cloudprovider/providers/gce/gce.go index eb2b76a013..ef56080968 100644 --- a/vendor/k8s.io/kubernetes/pkg/cloudprovider/providers/gce/gce.go +++ b/vendor/k8s.io/kubernetes/pkg/cloudprovider/providers/gce/gce.go @@ -151,6 +151,9 @@ type GCECloud struct { // New code generated interface to the GCE compute library. c cloud.Cloud + + // Keep a reference of this around so we can inject a new cloud.RateLimiter implementation. + s *cloud.Service } // TODO: replace gcfg with json @@ -515,17 +518,27 @@ func CreateGCECloud(config *CloudConfig) (*GCECloud, error) { } gce.manager = &gceServiceManager{gce} - gce.c = cloud.NewGCE(&cloud.Service{ + gce.s = &cloud.Service{ GA: service, Alpha: serviceAlpha, Beta: serviceBeta, ProjectRouter: &gceProjectRouter{gce}, RateLimiter: &gceRateLimiter{gce}, - }) + } + gce.c = cloud.NewGCE(gce.s) return gce, nil } +// SetRateLimiter adds a custom cloud.RateLimiter implementation. +// WARNING: Calling this could have unexpected behavior if you have in-flight +// requests. It is best to use this immediately after creating a GCECloud. +func (g *GCECloud) SetRateLimiter(rl cloud.RateLimiter) { + if rl != nil { + g.s.RateLimiter = rl + } +} + // determineSubnetURL queries for all subnetworks in a region for a given network and returns // the URL of the subnetwork which exists in the auto-subnet range. func determineSubnetURL(service *compute.Service, networkProjectID, networkName, region string) (string, error) { From d51ce72728924a28eb0a19e8fbc90da05998d7ff Mon Sep 17 00:00:00 2001 From: Rohit Ramkumar Date: Thu, 15 Mar 2018 16:58:35 -0700 Subject: [PATCH 2/4] Implement globbing - big change --- pkg/flags/flags.go | 14 +++- pkg/ratelimit/ratelimit.go | 118 ++++++++++++++++++++++++++++---- pkg/ratelimit/ratelimit_test.go | 3 + pkg/ratelimit/util.go | 0 4 files changed, 118 insertions(+), 17 deletions(-) create mode 100644 pkg/ratelimit/util.go diff --git a/pkg/flags/flags.go b/pkg/flags/flags.go index 9e72b746e8..b46e1a4069 100644 --- a/pkg/flags/flags.go +++ b/pkg/flags/flags.go @@ -33,6 +33,8 @@ const ( // DefaultNodePortRange is the list of ports or port ranges used by kubernetes for // allocating NodePort services. DefaultNodePortRange = "30000-32767" + // DefaultGCERateLimit is the default rate limit spec. + DefaultGCERateLimit = "*.Operation.Get,qps,10,100" ) var ( @@ -59,7 +61,7 @@ var ( func init() { F.NodePortRanges.ports = []string{DefaultNodePortRange} - F.GCERateLimit.specs = []string{} + F.GCERateLimit.specs = []string{DefaultGCERateLimit} } // Register flags with the command line parser. @@ -89,8 +91,7 @@ normal environments the controller should only delete a loadbalancer if the associated Ingress is deleted.`) flag.Var(&F.GCERateLimit, "gce-ratelimit", `Optional, can be used to rate limit certain GCE API calls. Example usage: ---gce-ratelimit=ga.Addresses.Get,qps,1.5,5 -(limit ga.Addresses.Get to maximum of 1.5 qps with a burst of 5). +--gce-ratelimit=ga.Addresses.Get,qps,1.5,5 (limit ga.Addresses.Get to maximum of 1.5 qps with a burst of 5). Use the flag more than once to rate limit more than one call.`) flag.StringVar(&F.HealthCheckPath, "health-check-path", "/", `Path used to health-check a backend service. All Services must serve a @@ -122,6 +123,7 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5 type RateLimitSpecs struct { specs []string + isSet bool } // Part of the flag.Value interface. @@ -131,6 +133,12 @@ func (r *RateLimitSpecs) String() string { // Set supports the flag being repeated multiple times. Part of the flag.Value interface. func (r *RateLimitSpecs) Set(value string) error { + // On first Set(), clear the default. On subsequent Set()'s, append. + if !r.isSet { + r.isSet = true + // Remove the default + r.specs = []string{} + } r.specs = append(r.specs, value) return nil } diff --git a/pkg/ratelimit/ratelimit.go b/pkg/ratelimit/ratelimit.go index b96a12f941..74c50fd59a 100644 --- a/pkg/ratelimit/ratelimit.go +++ b/pkg/ratelimit/ratelimit.go @@ -47,7 +47,7 @@ func NewGCERateLimiter(specs []string) (*GCERateLimiter, error) { return nil, fmt.Errorf("Must at least specify operation and rate limiter type.") } // params[0] should consist of the operation to rate limit. - key, err := constructRateLimitKey(params[0]) + keys, err := constructRateLimitKeys(params[0]) if err != nil { return nil, err } @@ -56,8 +56,11 @@ func NewGCERateLimiter(specs []string) (*GCERateLimiter, error) { if err != nil { return nil, err } - rateLimitImpls[key] = impl - glog.Infof("Configured rate limiting for: %v", key) + // For each spec, the rate limiter type is the same for all keys generated. + for _, key := range keys { + rateLimitImpls[key] = impl + glog.Infof("Configured rate limiting for: %v", key) + } } if len(rateLimitImpls) == 0 { return nil, nil @@ -67,6 +70,7 @@ func NewGCERateLimiter(specs []string) (*GCERateLimiter, error) { // Implementation of cloud.RateLimiter func (l *GCERateLimiter) Accept(ctx context.Context, key *cloud.RateLimitKey) error { + // If the rate limiter is empty, the do some default ch := make(chan struct{}) go func() { // Call flowcontrol.RateLimiter implementation. @@ -101,21 +105,75 @@ func (l *GCERateLimiter) rateLimitImpl(key *cloud.RateLimitKey) flowcontrol.Rate } // Expected format of param is [version].[service].[operation] -func constructRateLimitKey(param string) (*cloud.RateLimitKey, error) { +// this could return more than one cloud.RateLimitKey if "*" is used for either +// version, service, or operation (or more than one of those). +func constructRateLimitKeys(param string) ([]*cloud.RateLimitKey, error) { params := strings.Split(param, ".") if len(params) != 3 { return nil, fmt.Errorf("Must specify operation in [version].[service].[operation] format.") } - // TODO(rramkumar): Add another layer of validation here? - version := meta.Version(params[0]) - service := params[1] - operation := params[2] - return &cloud.RateLimitKey{ - ProjectID: "", - Operation: operation, - Version: version, - Service: service, - }, nil + keys := []*cloud.RateLimitKey{} + + // First parse the version. + versions := []meta.Version{} + if params[0] == "*" { + versions = meta.AllVersions + } else { + // Validate that the full provided version exists + if versionExists(params[0]) { + versions = append(versions, meta.Version(params[0])) + } else { + return nil, fmt.Errorf("Invalid version specified: %v", params[0]) + } + } + // For each version we get, parse the service. + for _, version := range versions { + // Construct a list of all possible services for the version. + services := []string{} + if params[1] == "*" { + for _, serviceInfo := meta.AllServices { + // Only include in the list of possible services if the service is + // available at the particular version we are looking at now. + if serviceInfo.Version() == version { + services = append(services, serviceInfo.Service) + } + } + } else { + // Validate that the full provided service exists. + if serviceExists(params[1]) { + services = append(services, params[1]) + } else { + return nil, fmt.Errorf("Invalid service specified: %v", params[1]) + } + } + // For each service we get, parse the operation. + for _, service := range services { + // These operation exist for every service. + operations := []string{} + if params[2] == "*" { + // Default for every service, regardless of version. + // TODO(rramkumar): Implement support for additional methods. + operations = []string{"Get", "List", "Insert", "Delete"} + } else { + // Validate that the full provided operation exists. + if operationExists(params[2]) { + operations = append(operations, params[2]) + } else { + return nil, fmt.Errorf("Invalid operation specified: %v", params[2]) + } + } + for _, operation := range operations { + key := &cloud.RateLimitKey{ + ProjectID: "", + Operation: operation, + Version: version, + Service: service, + } + keys = append(keys, key) + } + } + } + return keys, nil } // constructRateLimitImpl parses the slice and returns a flowcontrol.RateLimiter @@ -140,3 +198,35 @@ func constructRateLimitImpl(params []string) (flowcontrol.RateLimiter, error) { } return nil, fmt.Errorf("Invalid rate limiter type provided: %v", rlType) } + +// versionExists returns true if the passed in string is a valid meta.Version. +func versionExists(s string) bool { + for _, version := range meta.AllVersions { + if meta.Version(s) == version { + return true + } + } + return false +} + +// serviceExists returns true if the passed in string refers to a valid GCE service. +func serviceExists(s string) bool { + for _, serviceInfo := range meta.AllServices { + if s == serviceInfo.Service { + return true + } + } + return false +} + +// operationExists returns true if the passed string refers to a valid operation. +// Current valid operations are "Get", "List", "Insert", "Delete" +// TODO(rramkumar): Implement support for more methods. +func operationExists(s string) bool { + for _, operation := []string{"Get", "List", "Insert", "Delete"} { + if s == operation { + return true + } + } + return false +} diff --git a/pkg/ratelimit/ratelimit_test.go b/pkg/ratelimit/ratelimit_test.go index 1aa9114b9a..897e294ac3 100644 --- a/pkg/ratelimit/ratelimit_test.go +++ b/pkg/ratelimit/ratelimit_test.go @@ -25,6 +25,7 @@ func TestConfigureGCERateLimiting(t *testing.T) { []string{"ga.Addresses.Get,qps,1.5,5"}, []string{"ga.Addresses.List,qps,2,10"}, []string{"ga.Addresses.Get,qps,1.5,5", "ga.Firewalls.Get,qps,1.5,5"}, + []string{"ga.Operations.Get,qps,10,100"}, } invalidTestCases := [][]string{ []string{"gaAddresses.Get,qps,1.5,5"}, @@ -44,6 +45,7 @@ func TestConfigureGCERateLimiting(t *testing.T) { if err != nil { t.Errorf("Did not expect an error for test case: %v", testCase) } + // TODO(rramkumar): Verify the internal map for the GCERateLimiter } for _, testCase := range invalidTestCases { @@ -51,5 +53,6 @@ func TestConfigureGCERateLimiting(t *testing.T) { if err == nil { t.Errorf("Expected an error for test case: %v", testCase) } + // TODO(rramkumar): Verify the internal map for the GCERateLimiter } } diff --git a/pkg/ratelimit/util.go b/pkg/ratelimit/util.go new file mode 100644 index 0000000000..e69de29bb2 From 8efd96214e7a86be184d1389fb7680af96762402 Mon Sep 17 00:00:00 2001 From: Rohit Ramkumar Date: Thu, 15 Mar 2018 17:15:35 -0700 Subject: [PATCH 3/4] Revert "Implement globbing - big change" This reverts commit d51ce72728924a28eb0a19e8fbc90da05998d7ff. --- pkg/flags/flags.go | 14 +--- pkg/ratelimit/ratelimit.go | 118 ++++---------------------------- pkg/ratelimit/ratelimit_test.go | 3 - pkg/ratelimit/util.go | 0 4 files changed, 17 insertions(+), 118 deletions(-) delete mode 100644 pkg/ratelimit/util.go diff --git a/pkg/flags/flags.go b/pkg/flags/flags.go index b46e1a4069..9e72b746e8 100644 --- a/pkg/flags/flags.go +++ b/pkg/flags/flags.go @@ -33,8 +33,6 @@ const ( // DefaultNodePortRange is the list of ports or port ranges used by kubernetes for // allocating NodePort services. DefaultNodePortRange = "30000-32767" - // DefaultGCERateLimit is the default rate limit spec. - DefaultGCERateLimit = "*.Operation.Get,qps,10,100" ) var ( @@ -61,7 +59,7 @@ var ( func init() { F.NodePortRanges.ports = []string{DefaultNodePortRange} - F.GCERateLimit.specs = []string{DefaultGCERateLimit} + F.GCERateLimit.specs = []string{} } // Register flags with the command line parser. @@ -91,7 +89,8 @@ normal environments the controller should only delete a loadbalancer if the associated Ingress is deleted.`) flag.Var(&F.GCERateLimit, "gce-ratelimit", `Optional, can be used to rate limit certain GCE API calls. Example usage: ---gce-ratelimit=ga.Addresses.Get,qps,1.5,5 (limit ga.Addresses.Get to maximum of 1.5 qps with a burst of 5). +--gce-ratelimit=ga.Addresses.Get,qps,1.5,5 +(limit ga.Addresses.Get to maximum of 1.5 qps with a burst of 5). Use the flag more than once to rate limit more than one call.`) flag.StringVar(&F.HealthCheckPath, "health-check-path", "/", `Path used to health-check a backend service. All Services must serve a @@ -123,7 +122,6 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5 type RateLimitSpecs struct { specs []string - isSet bool } // Part of the flag.Value interface. @@ -133,12 +131,6 @@ func (r *RateLimitSpecs) String() string { // Set supports the flag being repeated multiple times. Part of the flag.Value interface. func (r *RateLimitSpecs) Set(value string) error { - // On first Set(), clear the default. On subsequent Set()'s, append. - if !r.isSet { - r.isSet = true - // Remove the default - r.specs = []string{} - } r.specs = append(r.specs, value) return nil } diff --git a/pkg/ratelimit/ratelimit.go b/pkg/ratelimit/ratelimit.go index 74c50fd59a..b96a12f941 100644 --- a/pkg/ratelimit/ratelimit.go +++ b/pkg/ratelimit/ratelimit.go @@ -47,7 +47,7 @@ func NewGCERateLimiter(specs []string) (*GCERateLimiter, error) { return nil, fmt.Errorf("Must at least specify operation and rate limiter type.") } // params[0] should consist of the operation to rate limit. - keys, err := constructRateLimitKeys(params[0]) + key, err := constructRateLimitKey(params[0]) if err != nil { return nil, err } @@ -56,11 +56,8 @@ func NewGCERateLimiter(specs []string) (*GCERateLimiter, error) { if err != nil { return nil, err } - // For each spec, the rate limiter type is the same for all keys generated. - for _, key := range keys { - rateLimitImpls[key] = impl - glog.Infof("Configured rate limiting for: %v", key) - } + rateLimitImpls[key] = impl + glog.Infof("Configured rate limiting for: %v", key) } if len(rateLimitImpls) == 0 { return nil, nil @@ -70,7 +67,6 @@ func NewGCERateLimiter(specs []string) (*GCERateLimiter, error) { // Implementation of cloud.RateLimiter func (l *GCERateLimiter) Accept(ctx context.Context, key *cloud.RateLimitKey) error { - // If the rate limiter is empty, the do some default ch := make(chan struct{}) go func() { // Call flowcontrol.RateLimiter implementation. @@ -105,75 +101,21 @@ func (l *GCERateLimiter) rateLimitImpl(key *cloud.RateLimitKey) flowcontrol.Rate } // Expected format of param is [version].[service].[operation] -// this could return more than one cloud.RateLimitKey if "*" is used for either -// version, service, or operation (or more than one of those). -func constructRateLimitKeys(param string) ([]*cloud.RateLimitKey, error) { +func constructRateLimitKey(param string) (*cloud.RateLimitKey, error) { params := strings.Split(param, ".") if len(params) != 3 { return nil, fmt.Errorf("Must specify operation in [version].[service].[operation] format.") } - keys := []*cloud.RateLimitKey{} - - // First parse the version. - versions := []meta.Version{} - if params[0] == "*" { - versions = meta.AllVersions - } else { - // Validate that the full provided version exists - if versionExists(params[0]) { - versions = append(versions, meta.Version(params[0])) - } else { - return nil, fmt.Errorf("Invalid version specified: %v", params[0]) - } - } - // For each version we get, parse the service. - for _, version := range versions { - // Construct a list of all possible services for the version. - services := []string{} - if params[1] == "*" { - for _, serviceInfo := meta.AllServices { - // Only include in the list of possible services if the service is - // available at the particular version we are looking at now. - if serviceInfo.Version() == version { - services = append(services, serviceInfo.Service) - } - } - } else { - // Validate that the full provided service exists. - if serviceExists(params[1]) { - services = append(services, params[1]) - } else { - return nil, fmt.Errorf("Invalid service specified: %v", params[1]) - } - } - // For each service we get, parse the operation. - for _, service := range services { - // These operation exist for every service. - operations := []string{} - if params[2] == "*" { - // Default for every service, regardless of version. - // TODO(rramkumar): Implement support for additional methods. - operations = []string{"Get", "List", "Insert", "Delete"} - } else { - // Validate that the full provided operation exists. - if operationExists(params[2]) { - operations = append(operations, params[2]) - } else { - return nil, fmt.Errorf("Invalid operation specified: %v", params[2]) - } - } - for _, operation := range operations { - key := &cloud.RateLimitKey{ - ProjectID: "", - Operation: operation, - Version: version, - Service: service, - } - keys = append(keys, key) - } - } - } - return keys, nil + // TODO(rramkumar): Add another layer of validation here? + version := meta.Version(params[0]) + service := params[1] + operation := params[2] + return &cloud.RateLimitKey{ + ProjectID: "", + Operation: operation, + Version: version, + Service: service, + }, nil } // constructRateLimitImpl parses the slice and returns a flowcontrol.RateLimiter @@ -198,35 +140,3 @@ func constructRateLimitImpl(params []string) (flowcontrol.RateLimiter, error) { } return nil, fmt.Errorf("Invalid rate limiter type provided: %v", rlType) } - -// versionExists returns true if the passed in string is a valid meta.Version. -func versionExists(s string) bool { - for _, version := range meta.AllVersions { - if meta.Version(s) == version { - return true - } - } - return false -} - -// serviceExists returns true if the passed in string refers to a valid GCE service. -func serviceExists(s string) bool { - for _, serviceInfo := range meta.AllServices { - if s == serviceInfo.Service { - return true - } - } - return false -} - -// operationExists returns true if the passed string refers to a valid operation. -// Current valid operations are "Get", "List", "Insert", "Delete" -// TODO(rramkumar): Implement support for more methods. -func operationExists(s string) bool { - for _, operation := []string{"Get", "List", "Insert", "Delete"} { - if s == operation { - return true - } - } - return false -} diff --git a/pkg/ratelimit/ratelimit_test.go b/pkg/ratelimit/ratelimit_test.go index 897e294ac3..1aa9114b9a 100644 --- a/pkg/ratelimit/ratelimit_test.go +++ b/pkg/ratelimit/ratelimit_test.go @@ -25,7 +25,6 @@ func TestConfigureGCERateLimiting(t *testing.T) { []string{"ga.Addresses.Get,qps,1.5,5"}, []string{"ga.Addresses.List,qps,2,10"}, []string{"ga.Addresses.Get,qps,1.5,5", "ga.Firewalls.Get,qps,1.5,5"}, - []string{"ga.Operations.Get,qps,10,100"}, } invalidTestCases := [][]string{ []string{"gaAddresses.Get,qps,1.5,5"}, @@ -45,7 +44,6 @@ func TestConfigureGCERateLimiting(t *testing.T) { if err != nil { t.Errorf("Did not expect an error for test case: %v", testCase) } - // TODO(rramkumar): Verify the internal map for the GCERateLimiter } for _, testCase := range invalidTestCases { @@ -53,6 +51,5 @@ func TestConfigureGCERateLimiting(t *testing.T) { if err == nil { t.Errorf("Expected an error for test case: %v", testCase) } - // TODO(rramkumar): Verify the internal map for the GCERateLimiter } } diff --git a/pkg/ratelimit/util.go b/pkg/ratelimit/util.go deleted file mode 100644 index e69de29bb2..0000000000 From 8030415722e47bb543f2ca022f06d064049d4e08 Mon Sep 17 00:00:00 2001 From: Rohit Ramkumar Date: Thu, 15 Mar 2018 17:28:33 -0700 Subject: [PATCH 4/4] Finishing touches --- cmd/glbc/app/clients.go | 2 +- pkg/flags/flags.go | 15 +++++++++++++-- pkg/ratelimit/ratelimit.go | 16 +++++++++------- pkg/ratelimit/ratelimit_test.go | 1 + 4 files changed, 24 insertions(+), 10 deletions(-) diff --git a/cmd/glbc/app/clients.go b/cmd/glbc/app/clients.go index 88e5b56b7f..8f6fe4c059 100644 --- a/cmd/glbc/app/clients.go +++ b/cmd/glbc/app/clients.go @@ -99,7 +99,7 @@ func NewGCEClient() *gce.GCECloud { // Configure GCE rate limiting rl, err := ratelimit.NewGCERateLimiter(flags.F.GCERateLimit.Values()) if err != nil { - glog.Errorf("Error in configuring rate limiting: %v", err) + glog.Fatalf("Error configuring rate limiting: %v", err) } cloud.SetRateLimiter(rl) // If this controller is scheduled on a node without compute/rw diff --git a/pkg/flags/flags.go b/pkg/flags/flags.go index 9e72b746e8..efe8ff5858 100644 --- a/pkg/flags/flags.go +++ b/pkg/flags/flags.go @@ -59,7 +59,7 @@ var ( func init() { F.NodePortRanges.ports = []string{DefaultNodePortRange} - F.GCERateLimit.specs = []string{} + F.GCERateLimit.specs = []string{"alpha.Operations.Get,qps,10,100", "beta.Operations.Get,qps,10,100", "ga.Operations.Get,qps,10,100"} } // Register flags with the command line parser. @@ -91,7 +91,11 @@ associated Ingress is deleted.`) `Optional, can be used to rate limit certain GCE API calls. Example usage: --gce-ratelimit=ga.Addresses.Get,qps,1.5,5 (limit ga.Addresses.Get to maximum of 1.5 qps with a burst of 5). -Use the flag more than once to rate limit more than one call.`) +Use the flag more than once to rate limit more than one call. If you do not +specify this flag, the default is to rate limit Operations.Get for all versions. +If you do specify this flag one or more times, this default will be overwritten. +If you want to still use the default, simply specify it along with your other +values.`) flag.StringVar(&F.HealthCheckPath, "health-check-path", "/", `Path used to health-check a backend service. All Services must serve a 200 page on this path. Currently this is only configurable globally.`) @@ -122,6 +126,7 @@ L7 load balancing. CSV values accepted. Example: -node-port-ranges=80,8080,400-5 type RateLimitSpecs struct { specs []string + isSet bool } // Part of the flag.Value interface. @@ -131,6 +136,12 @@ func (r *RateLimitSpecs) String() string { // Set supports the flag being repeated multiple times. Part of the flag.Value interface. func (r *RateLimitSpecs) Set(value string) error { + // On first Set(), clear the original defaults + // On subsequent Set()'s, append. + if !r.isSet { + r.specs = []string{} + r.isSet = true + } r.specs = append(r.specs, value) return nil } diff --git a/pkg/ratelimit/ratelimit.go b/pkg/ratelimit/ratelimit.go index b96a12f941..d2543d4a62 100644 --- a/pkg/ratelimit/ratelimit.go +++ b/pkg/ratelimit/ratelimit.go @@ -31,14 +31,14 @@ import ( // GCERateLimiter implements cloud.RateLimiter type GCERateLimiter struct { // Map a RateLimitKey to its rate limiter implementation. - rateLimitImpls map[*cloud.RateLimitKey]flowcontrol.RateLimiter + rateLimitImpls map[cloud.RateLimitKey]flowcontrol.RateLimiter } // NewGCERateLimiter parses the list of rate limiting specs passed in and // returns a properly configured cloud.RateLimiter implementation. // Expected format of specs: {"[version].[service].[operation],[type],[param1],[param2],..", "..."} func NewGCERateLimiter(specs []string) (*GCERateLimiter, error) { - rateLimitImpls := make(map[*cloud.RateLimitKey]flowcontrol.RateLimiter) + rateLimitImpls := make(map[cloud.RateLimitKey]flowcontrol.RateLimiter) // Within each specification, split on comma to get the operation, // rate limiter type, and extra parameters. for _, spec := range specs { @@ -91,7 +91,7 @@ func (l *GCERateLimiter) rateLimitImpl(key *cloud.RateLimitKey) flowcontrol.Rate // Since the passed in key will have the ProjectID field filled in, we need to // create a copy which does not, so that retreiving the rate limiter implementation // through the map works as expected. - keyCopy := &cloud.RateLimitKey{ + keyCopy := cloud.RateLimitKey{ ProjectID: "", Operation: key.Operation, Version: key.Version, @@ -101,21 +101,23 @@ func (l *GCERateLimiter) rateLimitImpl(key *cloud.RateLimitKey) flowcontrol.Rate } // Expected format of param is [version].[service].[operation] -func constructRateLimitKey(param string) (*cloud.RateLimitKey, error) { +func constructRateLimitKey(param string) (cloud.RateLimitKey, error) { + var retVal cloud.RateLimitKey params := strings.Split(param, ".") if len(params) != 3 { - return nil, fmt.Errorf("Must specify operation in [version].[service].[operation] format.") + return retVal, fmt.Errorf("Must specify rate limit in [version].[service].[operation] format: %v", param) } // TODO(rramkumar): Add another layer of validation here? version := meta.Version(params[0]) service := params[1] operation := params[2] - return &cloud.RateLimitKey{ + retVal = cloud.RateLimitKey{ ProjectID: "", Operation: operation, Version: version, Service: service, - }, nil + } + return retVal, nil } // constructRateLimitImpl parses the slice and returns a flowcontrol.RateLimiter diff --git a/pkg/ratelimit/ratelimit_test.go b/pkg/ratelimit/ratelimit_test.go index 1aa9114b9a..6e34c8c23b 100644 --- a/pkg/ratelimit/ratelimit_test.go +++ b/pkg/ratelimit/ratelimit_test.go @@ -25,6 +25,7 @@ func TestConfigureGCERateLimiting(t *testing.T) { []string{"ga.Addresses.Get,qps,1.5,5"}, []string{"ga.Addresses.List,qps,2,10"}, []string{"ga.Addresses.Get,qps,1.5,5", "ga.Firewalls.Get,qps,1.5,5"}, + []string{"ga.Operations.Get,qps,10,100"}, } invalidTestCases := [][]string{ []string{"gaAddresses.Get,qps,1.5,5"},