Skip to content
This repository has been archived by the owner on Aug 12, 2020. It is now read-only.

Commit

Permalink
Sync all services at startup and on a 1m schedule
Browse files Browse the repository at this point in the history
  • Loading branch information
stefanprodan committed Oct 30, 2019
1 parent f4cc401 commit 6a6f6d6
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ DOCKER_REPOSITORY:=stefanprodan
DOCKER_IMAGE_NAME:=$(DOCKER_REPOSITORY)/$(NAME)

run:
go run cmd/kxds/*.go serve --kubeconfig=$$HOME/.kube/config
go run cmd/kxds/*.go serve --kubeconfig=$$HOME/.kube/config --ads=true

envoy:
envoy -c envoy.yaml -l info
Expand Down
4 changes: 3 additions & 1 deletion cmd/kxds/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ var kubeConfig string
var port int
var namespace string
var ads bool
var portName string

func init() {
serveCmd.Flags().StringVarP(&masterURL, "master", "", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
serveCmd.Flags().StringVarP(&kubeConfig, "kubeconfig", "", "", "Path to a kubeconfig. Only required if out-of-cluster.")
serveCmd.Flags().IntVarP(&port, "port", "p", 18000, "Port to listen on.")
serveCmd.Flags().StringVarP(&namespace, "namespace", "", "", "Namespace to watch for Kubernetes service.")
serveCmd.Flags().BoolVarP(&ads, "ads", "", false, "ADS flag forces a delay in responding to streaming requests until all resources are explicitly named in the request.")
serveCmd.Flags().StringVarP(&portName, "portName", "", "http", "Include Kubernetes service with this named port.")

rootCmd.AddCommand(serveCmd)
}
Expand Down Expand Up @@ -55,7 +57,7 @@ func serve(cmd *cobra.Command, args []string) error {
go srv.Serve(ctx)
srv.Report()

envoyConfig := discovery.NewEnvoyConfig(cache)
envoyConfig := discovery.NewEnvoyConfig(cache, portName)
controller := discovery.NewController(clientset, namespace, envoyConfig)
go controller.Run(2, stopCh)

Expand Down
5 changes: 3 additions & 2 deletions cmd/kxds/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,19 @@ package main

import (
"fmt"

"github.com/spf13/cobra"
)

var VERSION = "0.0.3-beta.1"
var VERSION = "0.0.3"

func init() {
rootCmd.AddCommand(versionCmd)
}

var versionCmd = &cobra.Command{
Use: `version`,
Short: "Prints podcli version",
Short: "Prints version",
RunE: func(cmd *cobra.Command, args []string) error {
fmt.Println(VERSION)
return nil
Expand Down
7 changes: 4 additions & 3 deletions kustomize/gateway/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ spec:
cpu: 10m
memory: 32Mi
volumeMounts:
- name: config
- name: xds-config
mountPath: /config
- name: kxds
image: stefanprodan/kxds:0.0.3-beta.1
image: stefanprodan/kxds:0.0.3
imagePullPolicy: IfNotPresent
securityContext:
capabilities:
Expand All @@ -87,6 +87,7 @@ spec:
args:
- serve
- --port=18000
- --ads=true
ports:
- name: grpc
containerPort: 18000
Expand All @@ -100,6 +101,6 @@ spec:
tcpSocket:
port: grpc
volumes:
- name: config
- name: xds-config
configMap:
name: envoy
26 changes: 23 additions & 3 deletions pkg/discovery/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package discovery

import (
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -10,7 +12,6 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"time"
)

type Controller struct {
Expand Down Expand Up @@ -67,12 +68,22 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) {
return
}

c.syncAll()

for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}

<-stopCh
klog.Info("stopping discovery workers")
tickChan := time.NewTicker(60 * time.Second).C
for {
select {
case <-tickChan:
c.syncAll()
case <-stopCh:
klog.Info("stopping discovery workers")
return
}
}
}

func (c *Controller) sync(key string) error {
Expand All @@ -94,6 +105,15 @@ func (c *Controller) sync(key string) error {
return nil
}

func (c *Controller) syncAll() {
for _, value := range c.indexer.List() {
svc := value.(*corev1.Service)
c.envoyConfig.Upsert(fmt.Sprintf("%s/%s", svc.Namespace, svc.Name), *svc)

}
c.envoyConfig.Sync()
}

func (c *Controller) handleErr(err error, key interface{}) {
if err == nil {
c.queue.Forget(key)
Expand Down
32 changes: 27 additions & 5 deletions pkg/discovery/envoy.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type EnvoyConfig struct {
version int32
cache cache.SnapshotCache
upstreams *sync.Map
portName string
}

type Upstream struct {
Expand All @@ -44,11 +45,12 @@ type Canary struct {
CanaryWeight int
}

func NewEnvoyConfig(cache cache.SnapshotCache) *EnvoyConfig {
func NewEnvoyConfig(cache cache.SnapshotCache, portName string) *EnvoyConfig {
return &EnvoyConfig{
version: 0,
cache: cache,
upstreams: new(sync.Map),
portName: portName,
}
}

Expand All @@ -57,7 +59,9 @@ func (e *EnvoyConfig) Delete(upstream string) {
}

func (e *EnvoyConfig) Upsert(upstream string, service corev1.Service) {
e.upstreams.Store(upstream, service)
if e.shouldSync(service) {
e.upstreams.Store(upstream, service)
}
}

func (e *EnvoyConfig) Sync() {
Expand All @@ -70,21 +74,20 @@ func (e *EnvoyConfig) Sync() {
e.upstreams.Range(func(key interface{}, value interface{}) bool {
item := key.(string)
service := value.(corev1.Service)
portName := "http"
ok, upstream := serviceToUpstream(service)
if !ok {
klog.Infof("service %s excluded, due to annotation", item)
return true
}

cluster, port := serviceToCluster(service, portName, 5000)
cluster, port := serviceToCluster(service, e.portName, 5000)
if cluster != nil {
upstream.Name = cluster.Name
upstream.Port = port
clusters = append(clusters, cluster)
domains[cluster.Name] = upstream
} else {
klog.Infof("service %s excluded, no port named '%s' found", item, portName)
klog.Infof("service %s excluded, no port named '%s' found", item, e.portName)
}
return true
})
Expand All @@ -105,6 +108,25 @@ func (e *EnvoyConfig) Sync() {
}
}

func (e *EnvoyConfig) shouldSync(svc corev1.Service) bool {
var port int32
for _, p := range svc.Spec.Ports {
if p.Name == e.portName {
port = p.Port
}
}
if port == 0 {
return false
}
exposeAn := "envoy.gateway.kubernetes.io/expose"
for key, value := range svc.Annotations {
if key == exposeAn && value == "false" {
return false
}
}
return true
}

func serviceToCluster(svc corev1.Service, portName string, timeout int) (cluster *ev2.Cluster, port uint32) {
for _, p := range svc.Spec.Ports {
if p.Name == portName {
Expand Down

0 comments on commit 6a6f6d6

Please sign in to comment.