Skip to content
This repository has been archived by the owner on Aug 12, 2020. It is now read-only.

Commit

Permalink
Add weighted clusters support
Browse files Browse the repository at this point in the history
Turn ADS off and expose it as a flag
  • Loading branch information
stefanprodan committed Oct 29, 2019
1 parent 89444a4 commit f4cc401
Show file tree
Hide file tree
Showing 12 changed files with 177 additions and 41 deletions.
55 changes: 54 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ KxDS runs as a sidecar next to Envoy and configures the proxy to expose Kubernet
### Features

* **Service Discovery** KxDS watches Kubernetes for ClusterIP services with a `http` named port
* **Envoy Clusters (CDS)** are generated for each Kubernetes service in the form `<name>-<namespace>-<http-port>`
* **Envoy Clusters (CDS)** are generated for each Kubernetes service in the form `<service-name>-<namespace>-<http-port>`
* **Envoy Routes (RDS)** are generated for each cluster and mapped to the `<name>.<namespace>` domain
* **Envoy Weighted Clusters** are generated based on Kubernetes service annotations
* **Envoy Listeners (LDS)** KxDS configures Envoy to listen on port `8080` and sets up retry policies for each route

### Install
Expand All @@ -18,13 +19,65 @@ kubectl apply -k github.com/stefanprodan/kxds//kustomize/gateway

### Annotations

Kubernetes service:
```yaml
apiVersion: v1
kind: Service
metadata:
name: frontend
namespace: demo
annotations:
envoy.gateway.kubernetes.io/expose: "true"
envoy.gateway.kubernetes.io/timeout: "25s"
envoy.gateway.kubernetes.io/retries: "5"
envoy.gateway.kubernetes.io/domain: "app.internal"
spec:
ports:
- name: http
port: 9898
protocol: TCP
```
Envoy virtual host (generated by KxDS):
```yaml
name: frontend-demo-9898
domains:
- "podinfo.local"
request_headers_to_add:
- header:
key: "l5d-dst-override"
value: "frontend.demo.svc.cluster.local:9898"
request_headers_to_remove:
- "l5d-remote-ip"
- "l5d-server-id"
routes:
- match:
prefix: "/"
route:
cluster: frontend-demo-9898
host_rewrite: "frontend.demo"
timeout: 25s
retry_policy:
retry_on: "gateway-error,connect-failure,refused-stream"
num_retries: 5
per_try_timeout: 5s
```
Weighted destinations:
```yaml
apiVersion: v1
kind: Service
metadata:
name: podinfo
namespace: demo
annotations:
envoy.gateway.kubernetes.io/expose: "true"
envoy.gateway.kubernetes.io/domain: "podinfo.default"
envoy.gateway.kubernetes.io/primary: "podinfo-primary-demo-9898"
envoy.gateway.kubernetes.io/canary: "podinfo-canary-demo-9898"
envoy.gateway.kubernetes.io/canary-weight: "50"
```
The primary and canary name format is `<service-name>-<namespace>-<port>`.
Note that both Kubernetes services must exist or Envoy will reject the configuration.
4 changes: 3 additions & 1 deletion cmd/kxds/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ var masterURL string
var kubeConfig string
var port int
var namespace string
var ads bool

func init() {
serveCmd.Flags().StringVarP(&masterURL, "master", "", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
serveCmd.Flags().StringVarP(&kubeConfig, "kubeconfig", "", "", "Path to a kubeconfig. Only required if out-of-cluster.")
serveCmd.Flags().IntVarP(&port, "port", "p", 18000, "Port to listen on.")
serveCmd.Flags().StringVarP(&namespace, "namespace", "", "", "Namespace to watch for Kubernetes service.")
serveCmd.Flags().BoolVarP(&ads, "ads", "", false, "ADS flag forces a delay in responding to streaming requests until all resources are explicitly named in the request.")

rootCmd.AddCommand(serveCmd)
}
Expand All @@ -47,7 +49,7 @@ func serve(cmd *cobra.Command, args []string) error {
stopCh := signals.SetupSignalHandler()
ctx := context.Background()

cache := discovery.NewCache()
cache := discovery.NewCache(ads)

srv := server.NewServer(port, cache)
go srv.Serve(ctx)
Expand Down
2 changes: 1 addition & 1 deletion cmd/kxds/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"github.com/spf13/cobra"
)

var VERSION = "0.0.2"
var VERSION = "0.0.3-beta.1"

func init() {
rootCmd.AddCommand(versionCmd)
Expand Down
2 changes: 1 addition & 1 deletion kustomize/gateway/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ spec:
- name: config
mountPath: /config
- name: kxds
image: stefanprodan/kxds:0.0.2
image: stefanprodan/kxds:0.0.3-beta.1
imagePullPolicy: IfNotPresent
securityContext:
capabilities:
Expand Down
2 changes: 1 addition & 1 deletion kustomize/gateway/envoy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ admin:
address:
socket_address:
address: 0.0.0.0
port_value: 9901
port_value: 8081

dynamic_resources:
ads_config:
Expand Down
2 changes: 0 additions & 2 deletions kustomize/gateway/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
namespace: default
generatorOptions:
disableNameSuffixHash: true
resources:
- deployment.yaml
- rbac.yaml
Expand Down
2 changes: 2 additions & 0 deletions kustomize/podinfo/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ resources:
- hpa.yaml
- deployment.yaml
- service.yaml
- service-canary.yaml
- service-primary.yaml
21 changes: 21 additions & 0 deletions kustomize/podinfo/service-canary.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
apiVersion: v1
kind: Service
metadata:
name: podinfo-canary
labels:
app: podinfo-canary
annotations:
envoy.gateway.kubernetes.io/expose: "true"
spec:
type: ClusterIP
selector:
app: podinfo
ports:
- name: http
port: 9898
protocol: TCP
targetPort: http
- port: 9999
targetPort: grpc
protocol: TCP
name: grpc
21 changes: 21 additions & 0 deletions kustomize/podinfo/service-primary.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
apiVersion: v1
kind: Service
metadata:
name: podinfo-primary
labels:
app: podinfo-primary
annotations:
envoy.gateway.kubernetes.io/expose: "true"
spec:
type: ClusterIP
selector:
app: podinfo
ports:
- name: http
port: 9898
protocol: TCP
targetPort: http
- port: 9999
targetPort: grpc
protocol: TCP
name: grpc
13 changes: 6 additions & 7 deletions kustomize/podinfo/service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,16 @@ metadata:
envoy.gateway.kubernetes.io/expose: "true"
envoy.gateway.kubernetes.io/timeout: "25s"
envoy.gateway.kubernetes.io/retries: "5"
envoy.gateway.kubernetes.io/domain: "podinfo.local"
envoy.gateway.kubernetes.io/domain: "podinfo.default"
envoy.gateway.kubernetes.io/primary: "podinfo-primary-default-9898"
envoy.gateway.kubernetes.io/canary: "podinfo-canary-default-9898"
envoy.gateway.kubernetes.io/canary-weight: "50"
spec:
type: ClusterIP
selector:
app: podinfo
ports:
- name: http
port: 9898
protocol: TCP
targetPort: http
- port: 9999
targetPort: grpc
protocol: TCP
name: grpc
selector:
app: gateway
4 changes: 2 additions & 2 deletions pkg/discovery/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"k8s.io/klog"
)

func NewCache() cache.SnapshotCache {
return cache.NewSnapshotCache(true, Hasher{}, cacheLog{})
func NewCache(ads bool) cache.SnapshotCache {
return cache.NewSnapshotCache(ads, Hasher{}, cacheLog{})
}

type Hasher struct {
Expand Down
90 changes: 65 additions & 25 deletions pkg/discovery/envoy.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ type Upstream struct {
Prefix string
Retries uint32
Timeout time.Duration
Canary *Canary
}

type Canary struct {
PrimaryCluster string
CanaryCluster string
CanaryWeight int
}

func NewEnvoyConfig(cache cache.SnapshotCache) *EnvoyConfig {
Expand Down Expand Up @@ -148,10 +155,50 @@ func makeAddress(address string, port uint32) *ecore.Address {
}

func makeVirtualHost(name string, upstream Upstream) route.VirtualHost {
action := &route.RouteAction{
HostRewriteSpecifier: &route.RouteAction_HostRewrite{
HostRewrite: upstream.Host,
},
ClusterSpecifier: &route.RouteAction_Cluster{
Cluster: name,
},
Timeout: ptypes.DurationProto(upstream.Timeout),
}

if upstream.Canary != nil && upstream.Canary.CanaryCluster != "" && upstream.Canary.PrimaryCluster != "" {
action = &route.RouteAction{
HostRewriteSpecifier: &route.RouteAction_HostRewrite{
HostRewrite: upstream.Host,
},
ClusterSpecifier: &route.RouteAction_WeightedClusters{
WeightedClusters: &route.WeightedCluster{
Clusters: []*route.WeightedCluster_ClusterWeight{
{
Name: upstream.Canary.CanaryCluster,
Weight: &wrappers.UInt32Value{Value: uint32(upstream.Canary.CanaryWeight)},
},
{
Name: upstream.Canary.PrimaryCluster,
Weight: &wrappers.UInt32Value{Value: uint32(100 - upstream.Canary.CanaryWeight)},
},
},
},
},
Timeout: ptypes.DurationProto(upstream.Timeout),
}
}

r := &route.Route{
Match: makeRouteMatch(upstream.Prefix),
Action: makeRouteAction(name, upstream.Timeout, upstream.Host),
Match: &route.RouteMatch{
PathSpecifier: &route.RouteMatch_Prefix{
Prefix: upstream.Prefix,
},
},
Action: &route.Route_Route{
Route: action,
},
}

return route.VirtualHost{
Name: name,
Domains: []string{upstream.Domain},
Expand All @@ -177,28 +224,6 @@ func makeRetryPolicy(retries uint32) *route.RetryPolicy {
}
}

func makeRouteMatch(prefix string) *route.RouteMatch {
return &route.RouteMatch{
PathSpecifier: &route.RouteMatch_Prefix{
Prefix: prefix,
},
}
}

func makeRouteAction(cluster string, timeout time.Duration, hostRewrite string) *route.Route_Route {
return &route.Route_Route{
Route: &route.RouteAction{
HostRewriteSpecifier: &route.RouteAction_HostRewrite{
HostRewrite: hostRewrite,
},
ClusterSpecifier: &route.RouteAction_Cluster{
Cluster: cluster,
},
Timeout: ptypes.DurationProto(timeout),
},
}
}

func makeConnectionManager(routeName string, vhosts []*route.VirtualHost, drainTimeout int) *hcm.HttpConnectionManager {
return &hcm.HttpConnectionManager{
CodecType: hcm.HttpConnectionManager_AUTO,
Expand Down Expand Up @@ -243,13 +268,16 @@ func serviceToUpstream(svc corev1.Service) (bool, Upstream) {
Prefix: "/",
Retries: 1,
Timeout: 15 * time.Second,
Canary: &Canary{},
}

exposeAn := "envoy.gateway.kubernetes.io/expose"
domainAn := "envoy.gateway.kubernetes.io/domain"
timeoutAn := "envoy.gateway.kubernetes.io/timeout"
retriesAn := "envoy.gateway.kubernetes.io/retries"

primaryAn := "envoy.gateway.kubernetes.io/primary"
canaryAn := "envoy.gateway.kubernetes.io/canary"
canaryWeightAn := "envoy.gateway.kubernetes.io/canary-weight"
for key, value := range svc.Annotations {
if key == exposeAn && value == "false" {
expose = false
Expand All @@ -269,6 +297,18 @@ func serviceToUpstream(svc corev1.Service) (bool, Upstream) {
up.Retries = uint32(r)
}
}
if key == primaryAn {
up.Canary.PrimaryCluster = value
}
if key == canaryAn {
up.Canary.CanaryCluster = value
}
if key == canaryWeightAn {
r, err := strconv.Atoi(value)
if err == nil {
up.Canary.CanaryWeight = r
}
}
}
return expose, up
}

0 comments on commit f4cc401

Please sign in to comment.