From e512edbdceef74fbb450aef8f4460c8c94bc0abc Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Wed, 11 Sep 2019 17:58:44 -0700 Subject: [PATCH 1/3] separate out ManufacureSink so that we can push in a new style of config * v1 config is where we have a flat structure, this is backward compatible * v2 is new nested structure I have also added test cases for the changes I have brought in. Signed-off-by: Vigith Maurice --- sinks/interfaces.go | 42 ++++++++++++-- sinks/interfaces_test.go | 116 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 154 insertions(+), 4 deletions(-) create mode 100644 sinks/interfaces_test.go diff --git a/sinks/interfaces.go b/sinks/interfaces.go index ac07f223e..980911b97 100644 --- a/sinks/interfaces.go +++ b/sinks/interfaces.go @@ -18,6 +18,7 @@ package sinks import ( "errors" + "fmt" "github.com/golang/glog" "github.com/spf13/viper" @@ -29,10 +30,43 @@ type EventSinkInterface interface { UpdateEvents(eNew *v1.Event, eOld *v1.Event) } -// ManufactureSink will manufacture a sink according to viper configs -// TODO: Determine if it should return an array of sinks -func ManufactureSink() (e EventSinkInterface) { - s := viper.GetString("sink") +// ManufactureSink will manufacture a sink according to viper configs and return a list of sink interfaces +func ManufactureSink() (e []EventSinkInterface) { + // if we find "sink", it means we are using the older style config + sink := viper.GetString("sink") + if len(sink) != 0 { + glog.Warning("Using older configuration format, please move to nested version") + return []EventSinkInterface{v1ManufactureSink(sink)} + } + + // if we have sinks, then we are using the new style + sinks := viper.GetStringSlice("sinks") + if len(sinks) != 0 { + glog.Info("Using new configuration format") + return v2ManufactureSink(sinks) + } + return +} + +// v2ManufactureSink is the later style of writing the config +func v2ManufactureSink(sinks []string) (e []EventSinkInterface) { + for _, sink := range sinks { + switch sink { + case "glog": + e = append(e, NewGlogSink()) + case "stdout": + viper.SetDefault("stdout.JSONNamespace", "") + stdoutNamespace := viper.GetString("stdout.JSONNamespace") + e = append(e, NewStdoutSink(stdoutNamespace)) + default: + panic(fmt.Sprintf("Invalid Sink Specified, %s", sink)) + } + } + return +} + +// v1ManufactureSink is the old way config.json format +func v1ManufactureSink(s string) (e EventSinkInterface) { glog.Infof("Sink is [%v]", s) switch s { case "glog": diff --git a/sinks/interfaces_test.go b/sinks/interfaces_test.go new file mode 100644 index 000000000..35879b122 --- /dev/null +++ b/sinks/interfaces_test.go @@ -0,0 +1,116 @@ +package sinks + +import ( + "bytes" + "github.com/spf13/viper" + "reflect" + "testing" +) + +func Test_oldManufactureSink(t *testing.T) { + type args struct { + s string + } + tests := []struct { + name string + args args + wantE EventSinkInterface + }{ + { + name: "glog", + args: args{s: "glog"}, + wantE: NewGlogSink(), + }, + { + name: "stdout", + args: args{s: "stdout"}, + wantE: NewStdoutSink(""), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if gotE := v1ManufactureSink(tt.args.s); !reflect.DeepEqual(gotE, tt.wantE) { + t.Errorf("oldManufactureSink() = %v, want %v", gotE, tt.wantE) + } + }) + } +} + +func Test_v2ManufactureSink(t *testing.T) { + type args struct { + sinks []string + } + tests := []struct { + name string + args args + wantE []EventSinkInterface + }{ + { + name: "golog", + args: args{sinks: []string{"glog"}}, + wantE: []EventSinkInterface{NewGlogSink()}, + }, + { + name: "stdout", + args: args{sinks: []string{"stdout"}}, + wantE: []EventSinkInterface{NewStdoutSink("")}, + }, + { + name: "glog_stdout", + args: args{sinks: []string{"glog", "stdout"}}, + wantE: []EventSinkInterface{NewGlogSink(), NewStdoutSink("")}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if gotE := v2ManufactureSink(tt.args.sinks); !reflect.DeepEqual(gotE, tt.wantE) { + t.Errorf("v2ManufactureSink() = %v, want %v", gotE, tt.wantE) + } + }) + } +} + +func Test_v2ManufactureSink_JSON(t *testing.T) { + var json = []byte(` +{ + "kubeconfig": "/var/run/kubernetes/admin.kubeconfig", + "sink": "glog" +} +`) + + viper.SetConfigType("json") + err := viper.ReadConfig(bytes.NewBuffer(json)) + if err != nil { + t.Fatalf("viper.ReadConfig Failed, %s", err) + } + + got := ManufactureSink() + wanted := []EventSinkInterface{NewGlogSink()} + if !reflect.DeepEqual(got, wanted) { + t.Errorf("v2ManufactureSink() = %v, want %v", got, wanted) + } +} + +func Test_v2ManufactureSink_NestedJSON(t *testing.T) { + var json = []byte(` +{ + "kubeconfig": "/var/run/kubernetes/admin.kubeconfig", + "sinks": ["glog", "stdout"], + "stdout" : { + "JSONNamespace": "foobar" + } +} +`) + viper.SetConfigType("json") + err := viper.ReadConfig(bytes.NewBuffer(json)) + if err != nil { + t.Fatalf("viper.ReadConfig Failed, %s", err) + } + + got := ManufactureSink() + wanted := []EventSinkInterface{NewGlogSink(), NewStdoutSink("foobar")} + if !reflect.DeepEqual(got, wanted) { + t.Errorf("v2ManufactureSink() = %v, want %v", got, wanted) + } + +} From 809147fdf39c221db884c0e8d6e80c6d9ae34d77 Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Thu, 12 Sep 2019 08:38:34 -0700 Subject: [PATCH 2/3] * move controller to a separate method so we can inject fake client for testing * ability to pass context to controller that can be propagated * moving loading viper configs to a different method Signed-off-by: Vigith Maurice --- main.go | 38 +++++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/main.go b/main.go index 030530603..76068b1da 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + "context" "flag" "net/http" "os" @@ -57,9 +58,8 @@ func sigHandler() <-chan struct{} { return stop } -// loadConfig will parse input + config file and return a clientset -func loadConfig() kubernetes.Interface { - var config *rest.Config +// read viper configs +func loadViperConfig() { var err error flag.Parse() @@ -77,12 +77,21 @@ func loadConfig() kubernetes.Interface { panic(err.Error()) } - viper.BindEnv("kubeconfig") // Allows the KUBECONFIG env var to override where the kubeconfig is + err = viper.BindEnv("kubeconfig") // Allows the KUBECONFIG env var to override where the kubeconfig is + if err != nil { + glog.Fatalf("BindEnv Failed, %s", err) + } // Allow specifying a custom config file via the EVENTROUTER_CONFIG env var if forceCfg := os.Getenv("EVENTROUTER_CONFIG"); forceCfg != "" { viper.SetConfigFile(forceCfg) } +} + +// loadConfig will parse input + config file and return a clientset +func loadK8sConfig() kubernetes.Interface { + var config *rest.Config + var err error kubeconfig := viper.GetString("kubeconfig") if len(kubeconfig) > 0 { config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) @@ -101,16 +110,13 @@ func loadConfig() kubernetes.Interface { return clientset } -// main entry point of the program -func main() { +func startController(ctx context.Context, clientset kubernetes.Interface) { var wg sync.WaitGroup - - clientset := loadConfig() sharedInformers := informers.NewSharedInformerFactory(clientset, viper.GetDuration("resync-interval")) eventsInformer := sharedInformers.Core().V1().Events() // TODO: Support locking for HA https://github.com/kubernetes/kubernetes/pull/42666 - eventRouter := NewEventRouter(clientset, eventsInformer) + eventRouter := NewEventRouter(ctx, clientset, eventsInformer) stop := sigHandler() // Startup the http listener for Prometheus Metrics endpoint. @@ -132,5 +138,19 @@ func main() { sharedInformers.Start(stop) wg.Wait() glog.Warningf("Exiting main()") +} + +// main entry point of the program +func main() { + loadViperConfig() + + clientset := loadK8sConfig() + + // we use context to cancel the go routines created in sink, etc. + ctx := context.Background() + + // makes testing easier if we separate out the controller + startController(ctx, clientset) + os.Exit(1) } From 91250a482830a0d238e3ae201683fa35ff02bab5 Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Fri, 13 Sep 2019 18:11:48 -0700 Subject: [PATCH 3/3] * use context and pass the context to sinks * support concurrency for UpdateEvents by using channels (fan-out) * multi sink approach * added new config format support and is backward compatible * add tests for sinks which i changed * ran go fmt Signed-off-by: Vigith Maurice --- eventrouter.go | 23 ++++++---- go.sum | 4 ++ main.go | 17 +++++--- sinks/glogsink.go | 61 ++++++++++++++++++++++----- sinks/glogsink_test.go | 63 ++++++++++++++++++++++++++++ sinks/interfaces.go | 40 +++++++++++------- sinks/interfaces_test.go | 72 +++++++++++++++++++++++-------- sinks/stdoutsink.go | 91 ++++++++++++++++++++++++++++++---------- sinks/stdoutsink_test.go | 76 +++++++++++++++++++++++++++++++++ 9 files changed, 371 insertions(+), 76 deletions(-) create mode 100644 sinks/glogsink_test.go create mode 100644 sinks/stdoutsink_test.go diff --git a/eventrouter.go b/eventrouter.go index 3ea4f1997..0417730be 100644 --- a/eventrouter.go +++ b/eventrouter.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + "context" "fmt" "github.com/golang/glog" @@ -62,6 +63,10 @@ func init() { // EventRouter is responsible for maintaining a stream of kubernetes // system Events and pushing them to another channel for storage type EventRouter struct { + // AddEventHandler do not take context, so we need to pass it around so that + // we can propagate cancel to the sinks + ctx context.Context + // kubeclient is the main kubernetes interface kubeClient kubernetes.Interface @@ -71,16 +76,16 @@ type EventRouter struct { // returns true if the event store has been synced eListerSynched cache.InformerSynced - // event sink - // TODO: Determine if we want to support multiple sinks. - eSink sinks.EventSinkInterface + // event sinks + eSink []sinks.EventSinkInterface } // NewEventRouter will create a new event router using the input params -func NewEventRouter(kubeClient kubernetes.Interface, eventsInformer coreinformers.EventInformer) *EventRouter { +func NewEventRouter(ctx context.Context, kubeClient kubernetes.Interface, eventsInformer coreinformers.EventInformer) *EventRouter { er := &EventRouter{ + ctx: ctx, kubeClient: kubeClient, - eSink: sinks.ManufactureSink(), + eSink: sinks.ManufactureSink(ctx), } eventsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: er.addEvent, @@ -111,7 +116,9 @@ func (er *EventRouter) Run(stopCh <-chan struct{}) { func (er *EventRouter) addEvent(obj interface{}) { e := obj.(*v1.Event) prometheusEvent(e) - er.eSink.UpdateEvents(e, nil) + for _, eSink := range er.eSink { + eSink.UpdateEvents(e, nil) + } } // updateEvent is called any time there is an update to an existing event @@ -119,7 +126,9 @@ func (er *EventRouter) updateEvent(objOld interface{}, objNew interface{}) { eOld := objOld.(*v1.Event) eNew := objNew.(*v1.Event) prometheusEvent(eNew) - er.eSink.UpdateEvents(eNew, eOld) + for _, eSink := range er.eSink { + eSink.UpdateEvents(eNew, eOld) + } } // prometheusEvent is called when an event is added or updated diff --git a/go.sum b/go.sum index 69cdbff54..535c42ef7 100644 --- a/go.sum +++ b/go.sum @@ -50,6 +50,7 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/evanphx/json-patch v0.0.0-20190203023257-5858425f7550/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= +github.com/evanphx/json-patch v4.2.0+incompatible h1:fUDGZCv/7iAN7u0puUVhvKCcsR6vRfwrJatElLBEf0I= github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -116,6 +117,7 @@ github.com/influxdata/influxdb v1.7.7 h1:UvNzAPfBrKMENVbQ4mr4ccA9sW+W1Ihl0Yh1s0B github.com/influxdata/influxdb v1.7.7/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY= github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM= github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/json-iterator/go v0.0.0-20180612202835-f2b4162afba3/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= @@ -309,6 +311,7 @@ gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= k8s.io/api v0.0.0-20190620084959-7cf5895f2711/go.mod h1:TBhBqb1AWbBQbW3XRusr7n7E4v2+5ZY8r8sAMnyFC5A= @@ -326,6 +329,7 @@ k8s.io/klog v0.3.1/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= k8s.io/klog v0.4.0 h1:lCJCxf/LIowc2IGS9TPjWDyXY4nOmdGdfcwwDQCOURQ= k8s.io/klog v0.4.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= k8s.io/kube-openapi v0.0.0-20190228160746-b3a7cee44a30/go.mod h1:BXM9ceUBTj2QnfH2MK1odQs778ajze1RxcmP6S8RVVc= +k8s.io/kube-openapi v0.0.0-20190709113604-33be087ad058 h1:di3XCwddOR9cWBNpfgXaskhh6cgJuwcK54rvtwUaC10= k8s.io/kube-openapi v0.0.0-20190709113604-33be087ad058/go.mod h1:nfDlWeOsu3pUf4yWGL+ERqohP4YsZcBJXWMK+gkzOA4= k8s.io/utils v0.0.0-20190221042446-c2654d5206da/go.mod h1:8k8uAuAQ0rXslZKaEWd0c3oVhZz7sSzSiPnVZayjIX0= k8s.io/utils v0.0.0-20190809000727-6c36bc71fc4a h1:uy5HAgt4Ha5rEMbhZA+aM1j2cq5LmR6LQ71EYC2sVH4= diff --git a/main.go b/main.go index 76068b1da..cf6278ae2 100644 --- a/main.go +++ b/main.go @@ -40,7 +40,8 @@ import ( var addr = flag.String("listen-address", ":8080", "The address to listen on for HTTP requests.") // setup a signal hander to gracefully exit -func sigHandler() <-chan struct{} { +func sigHandler(ctx context.Context) (<-chan struct{}, context.Context) { + ctx, cancel := context.WithCancel(ctx) stop := make(chan struct{}) go func() { c := make(chan os.Signal, 1) @@ -51,11 +52,16 @@ func sigHandler() <-chan struct{} { syscall.SIGABRT, // Abnormal termination syscall.SIGILL, // illegal instruction syscall.SIGFPE) // floating point - this is why we can't have nice things - sig := <-c - glog.Warningf("Signal (%v) Detected, Shutting Down", sig) + select { + case sig := <-c: + glog.Warningf("Signal (%v) Detected, Shutting Down", sig) + case <-ctx.Done(): + glog.Warningf("cancelling ctx. ctx.Done invoked, %s", ctx.Err()) + } + cancel() close(stop) }() - return stop + return stop, ctx } // read viper configs @@ -115,9 +121,10 @@ func startController(ctx context.Context, clientset kubernetes.Interface) { sharedInformers := informers.NewSharedInformerFactory(clientset, viper.GetDuration("resync-interval")) eventsInformer := sharedInformers.Core().V1().Events() + stop, ctx := sigHandler(ctx) + // TODO: Support locking for HA https://github.com/kubernetes/kubernetes/pull/42666 eventRouter := NewEventRouter(ctx, clientset, eventsInformer) - stop := sigHandler() // Startup the http listener for Prometheus Metrics endpoint. go func() { diff --git a/sinks/glogsink.go b/sinks/glogsink.go index 1ffc376a6..5b8ca188f 100644 --- a/sinks/glogsink.go +++ b/sinks/glogsink.go @@ -17,30 +17,71 @@ limitations under the License. package sinks import ( + "context" "encoding/json" - "github.com/golang/glog" "k8s.io/api/core/v1" + "sync" ) +// let's have few go routines to emit out glog. +const glogSinkConcurrency = 5 + // GlogSink is the most basic sink // Useful when you already have ELK/EFK Stack type GlogSink struct { - // TODO: create a channel and buffer for scaling + updateChan chan UpdateEvent } // NewGlogSink will create a new -func NewGlogSink() EventSinkInterface { - return &GlogSink{} +func NewGlogSink(ctx context.Context) EventSinkInterface { + gs := &GlogSink{ + updateChan: make(chan UpdateEvent), + } + + var wg sync.WaitGroup + wg.Add(glogSinkConcurrency) + glog.V(3).Infof("Starting glog sink with concurrency=%d", glogSinkConcurrency) + // let's have couple of parallel routines + go func() { + defer wg.Done() + gs.updateEvents(ctx) + }() + + // wait + go func() { + wg.Wait() + glog.V(3).Info("Stopping glog sink WaitGroup") + close(gs.updateChan) + }() + + return gs } -// UpdateEvents implements the EventSinkInterface +// UpdateEvents implements the EventSinkInterface. +// This is not a non-blocking call because the channel could get full. But ATM I do not care because +// glog just logs the message. It is CPU heavy (JSON Marshalling) and has no I/O. So the time complexity of the +// blocking call is very minimal. Also we could spawn more routines of updateEvents to make it concurrent. func (gs *GlogSink) UpdateEvents(eNew *v1.Event, eOld *v1.Event) { - eData := NewEventData(eNew, eOld) + gs.updateChan <- UpdateEvent{ + eNew: eNew, + eOld: eOld, + } +} - if eJSONBytes, err := json.Marshal(eData); err == nil { - glog.Info(string(eJSONBytes)) - } else { - glog.Warningf("Failed to json serialize event: %v", err) +func (gs *GlogSink) updateEvents(ctx context.Context) { + for { + select { + case event := <-gs.updateChan: + eData := NewEventData(event.eNew, event.eOld) + if eJSONBytes, err := json.Marshal(eData); err == nil { + glog.Info(string(eJSONBytes)) + } else { + glog.Warningf("Failed to json serialize event: %v", err) + } + case <-ctx.Done(): + glog.Warning("Ending, glog sink receiver channel, got ctx.Done()") + return + } } } diff --git a/sinks/glogsink_test.go b/sinks/glogsink_test.go new file mode 100644 index 000000000..1bc2a3e30 --- /dev/null +++ b/sinks/glogsink_test.go @@ -0,0 +1,63 @@ +package sinks + +import ( + "context" + core_v1 "k8s.io/api/core/v1" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" + "testing" +) + +func TestNewGlogSink(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + gs := NewGlogSink(ctx) + + client := fake.NewSimpleClientset() + + // test create + e := createTestEvent(t, client) + gs.UpdateEvents(e, nil) + + // test update + oldE, newE := updateTestEvent(t, client, e) + gs.UpdateEvents(newE, oldE) + + // we are not using it + deleteTestEvent(t, client) + + cancel() +} + +func createTestEvent(t *testing.T, client *fake.Clientset) *core_v1.Event { + var err error + e := &core_v1.Event{ObjectMeta: meta_v1.ObjectMeta{Name: "my-test-event"}} + e, err = client.CoreV1().Events("test-ns").Create(e) + if err != nil { + t.Errorf("Create Event Failed, %s", err) + } + + return e +} + +func updateTestEvent(t *testing.T, client *fake.Clientset, e *core_v1.Event) (*core_v1.Event, *core_v1.Event) { + var err error + + // send a update event + e.Reason = "foobar" + newE, err := client.CoreV1().Events("test-ns").Update(e) + if err != nil { + t.Errorf("Update Event Failed, %s", err) + } + + return e, newE +} + +func deleteTestEvent(t *testing.T, client *fake.Clientset) { + var err error + + err = client.CoreV1().Events("test-ns").Delete("my-test-event", &meta_v1.DeleteOptions{}) + if err != nil { + t.Errorf("Delete Event Failed, %s", err) + } + +} diff --git a/sinks/interfaces.go b/sinks/interfaces.go index 980911b97..01816c9e9 100644 --- a/sinks/interfaces.go +++ b/sinks/interfaces.go @@ -17,6 +17,7 @@ limitations under the License. package sinks import ( + "context" "errors" "fmt" @@ -25,39 +26,48 @@ import ( v1 "k8s.io/api/core/v1" ) -// EventSinkInterface is the interface used to shunt events +// update events could be send via channels too +type UpdateEvent struct { + eNew *v1.Event + eOld *v1.Event +} + +// EventSinkInterface is the interface used to shunt events. It is upto the implementer of the sink +// to make sure the type EventSinkInterface interface { UpdateEvents(eNew *v1.Event, eOld *v1.Event) } // ManufactureSink will manufacture a sink according to viper configs and return a list of sink interfaces -func ManufactureSink() (e []EventSinkInterface) { +func ManufactureSink(ctx context.Context) (e []EventSinkInterface) { + // check for new style first + // if we have sinks, then we are using the new style + sinks := viper.GetStringSlice("sinks") + if len(sinks) != 0 { + glog.Info("Using new configuration format") + return v2ManufactureSink(ctx, sinks) + } + // if we find "sink", it means we are using the older style config sink := viper.GetString("sink") if len(sink) != 0 { glog.Warning("Using older configuration format, please move to nested version") - return []EventSinkInterface{v1ManufactureSink(sink)} + return []EventSinkInterface{v1ManufactureSink(ctx, sink)} } - // if we have sinks, then we are using the new style - sinks := viper.GetStringSlice("sinks") - if len(sinks) != 0 { - glog.Info("Using new configuration format") - return v2ManufactureSink(sinks) - } return } // v2ManufactureSink is the later style of writing the config -func v2ManufactureSink(sinks []string) (e []EventSinkInterface) { +func v2ManufactureSink(ctx context.Context, sinks []string) (e []EventSinkInterface) { for _, sink := range sinks { switch sink { case "glog": - e = append(e, NewGlogSink()) + e = append(e, NewGlogSink(ctx)) case "stdout": viper.SetDefault("stdout.JSONNamespace", "") stdoutNamespace := viper.GetString("stdout.JSONNamespace") - e = append(e, NewStdoutSink(stdoutNamespace)) + e = append(e, NewStdoutSink(ctx, stdoutNamespace)) default: panic(fmt.Sprintf("Invalid Sink Specified, %s", sink)) } @@ -66,15 +76,15 @@ func v2ManufactureSink(sinks []string) (e []EventSinkInterface) { } // v1ManufactureSink is the old way config.json format -func v1ManufactureSink(s string) (e EventSinkInterface) { +func v1ManufactureSink(ctx context.Context, s string) (e EventSinkInterface) { glog.Infof("Sink is [%v]", s) switch s { case "glog": - e = NewGlogSink() + e = NewGlogSink(ctx) case "stdout": viper.SetDefault("stdoutJSONNamespace", "") stdoutNamespace := viper.GetString("stdoutJSONNamespace") - e = NewStdoutSink(stdoutNamespace) + e = NewStdoutSink(ctx, stdoutNamespace) case "http": url := viper.GetString("httpSinkUrl") if url == "" { diff --git a/sinks/interfaces_test.go b/sinks/interfaces_test.go index 35879b122..391286218 100644 --- a/sinks/interfaces_test.go +++ b/sinks/interfaces_test.go @@ -2,12 +2,17 @@ package sinks import ( "bytes" + "context" + "fmt" "github.com/spf13/viper" "reflect" "testing" ) -func Test_oldManufactureSink(t *testing.T) { +// minimum tests for what I am changing. Please add more as we edit more sinks + +func Test_v1ManufactureSink(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) type args struct { s string } @@ -19,24 +24,29 @@ func Test_oldManufactureSink(t *testing.T) { { name: "glog", args: args{s: "glog"}, - wantE: NewGlogSink(), + wantE: NewGlogSink(ctx), }, { name: "stdout", args: args{s: "stdout"}, - wantE: NewStdoutSink(""), + wantE: NewStdoutSink(ctx, ""), }, } + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if gotE := v1ManufactureSink(tt.args.s); !reflect.DeepEqual(gotE, tt.wantE) { + gotE := v1ManufactureSink(ctx, tt.args.s) + // let's check for the type + if !reflect.DeepEqual(fmt.Sprintf("%T", gotE), fmt.Sprintf("%T", tt.wantE)) { t.Errorf("oldManufactureSink() = %v, want %v", gotE, tt.wantE) } }) } + cancel() } func Test_v2ManufactureSink(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) type args struct { sinks []string } @@ -48,26 +58,36 @@ func Test_v2ManufactureSink(t *testing.T) { { name: "golog", args: args{sinks: []string{"glog"}}, - wantE: []EventSinkInterface{NewGlogSink()}, + wantE: []EventSinkInterface{NewGlogSink(ctx)}, }, { name: "stdout", args: args{sinks: []string{"stdout"}}, - wantE: []EventSinkInterface{NewStdoutSink("")}, + wantE: []EventSinkInterface{NewStdoutSink(ctx, "")}, }, { name: "glog_stdout", args: args{sinks: []string{"glog", "stdout"}}, - wantE: []EventSinkInterface{NewGlogSink(), NewStdoutSink("")}, + wantE: []EventSinkInterface{NewGlogSink(ctx), NewStdoutSink(ctx, "")}, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if gotE := v2ManufactureSink(tt.args.sinks); !reflect.DeepEqual(gotE, tt.wantE) { - t.Errorf("v2ManufactureSink() = %v, want %v", gotE, tt.wantE) + gotE := v2ManufactureSink(ctx, tt.args.sinks) + if len(gotE) != len(tt.wantE) { + t.Errorf("v2ManufactureSink() len = %d, want len %d", len(gotE), len(tt.wantE)) + } else { + for i := range gotE { + if !reflect.DeepEqual(fmt.Sprintf("%T", gotE[i]), fmt.Sprintf("%T", tt.wantE[i])) { + t.Errorf("v2ManufactureSink() Type = %T, want len %T", gotE[i], tt.wantE[i]) + } + } } + }) } + + cancel() } func Test_v2ManufactureSink_JSON(t *testing.T) { @@ -84,11 +104,20 @@ func Test_v2ManufactureSink_JSON(t *testing.T) { t.Fatalf("viper.ReadConfig Failed, %s", err) } - got := ManufactureSink() - wanted := []EventSinkInterface{NewGlogSink()} - if !reflect.DeepEqual(got, wanted) { - t.Errorf("v2ManufactureSink() = %v, want %v", got, wanted) + ctx, cancel := context.WithCancel(context.Background()) + gotE := ManufactureSink(ctx) + wantE := []EventSinkInterface{NewGlogSink(ctx)} + if len(gotE) != len(wantE) { + t.Errorf("v2ManufactureSink() len = %d, want len %d", len(gotE), len(wantE)) + } else { + for i := range gotE { + if !reflect.DeepEqual(fmt.Sprintf("%T", gotE[i]), fmt.Sprintf("%T", wantE[i])) { + t.Errorf("v2ManufactureSink() Type = %T, want len %T", gotE[i], wantE[i]) + } + } } + + cancel() } func Test_v2ManufactureSink_NestedJSON(t *testing.T) { @@ -107,10 +136,19 @@ func Test_v2ManufactureSink_NestedJSON(t *testing.T) { t.Fatalf("viper.ReadConfig Failed, %s", err) } - got := ManufactureSink() - wanted := []EventSinkInterface{NewGlogSink(), NewStdoutSink("foobar")} - if !reflect.DeepEqual(got, wanted) { - t.Errorf("v2ManufactureSink() = %v, want %v", got, wanted) + ctx, cancel := context.WithCancel(context.Background()) + gotE := ManufactureSink(ctx) + // this is not a valid test, the argument is not really tested + wantE := []EventSinkInterface{NewGlogSink(ctx), NewStdoutSink(ctx, "foobar")} + if len(gotE) != len(wantE) { + t.Errorf("v2ManufactureSink() len = %d, want len %d", len(gotE), len(wantE)) + } else { + for i := range gotE { + if !reflect.DeepEqual(fmt.Sprintf("%T", gotE[i]), fmt.Sprintf("%T", wantE[i])) { + t.Errorf("v2ManufactureSink() Type = %T, want len %T", gotE[i], wantE[i]) + } + } } + cancel() } diff --git a/sinks/stdoutsink.go b/sinks/stdoutsink.go index a89cd9bc9..9a7d60a0b 100644 --- a/sinks/stdoutsink.go +++ b/sinks/stdoutsink.go @@ -17,47 +17,94 @@ limitations under the License. package sinks import ( + "context" "encoding/json" "fmt" + "github.com/golang/glog" "os" + "sync" "k8s.io/api/core/v1" ) +// let's have few go routines to emit out to stdout. +const stdoutSinkConcurrency = 5 + // StdoutSink is the other basic sink // By default, Fluentd/ElasticSearch won't index glog formatted lines // By logging raw JSON to stdout, we will get automated indexing which // can be queried in Kibana. type StdoutSink struct { - // TODO: create a channel and buffer for scaling - namespace string + updateChan chan UpdateEvent + namespace string } - // NewStdoutSink will create a new StdoutSink with default options, returned as // an EventSinkInterface -func NewStdoutSink(namespace string) EventSinkInterface { - return &StdoutSink{ - namespace: namespace} +func NewStdoutSink(ctx context.Context, namespace string) EventSinkInterface { + gs := &StdoutSink{ + namespace: namespace, + updateChan: make(chan UpdateEvent), + } + + var wg sync.WaitGroup + wg.Add(stdoutSinkConcurrency) + glog.V(3).Infof("Starting stdout sink with concurrency=%d", stdoutSinkConcurrency) + + // let's have couple of parallel routines + go func() { + defer wg.Done() + gs.updateEvents(ctx) + }() + + // wait + go func() { + wg.Wait() + glog.V(3).Info("Stopping stdout sink WaitGroup") + close(gs.updateChan) + }() + + return gs +} + +// UpdateEvents implements the EventSinkInterface for stdout. Same like glog sink, this is not a non-blocking call because the +// channel could get full. But ATM I do not care because stdout just logs the message to stdout. It is CPU heavy (JSON Marshalling) +// and has no I/O. So the time complexity of the blocking call is very minimal. Also we could spawn more routines of +// updateEvents to make it concurrent. +func (ss *StdoutSink) UpdateEvents(eNew *v1.Event, eOld *v1.Event) { + ss.updateChan <- UpdateEvent{ + eNew: eNew, + eOld: eOld, + } } // UpdateEvents implements the EventSinkInterface -func (gs *StdoutSink) UpdateEvents(eNew *v1.Event, eOld *v1.Event) { - eData := NewEventData(eNew, eOld) - - if len(gs.namespace) > 0 { - namespacedData := map[string]interface{}{} - namespacedData[gs.namespace] = eData - if eJSONBytes, err := json.Marshal(namespacedData); err == nil { - fmt.Println(string(eJSONBytes)) - } else { - fmt.Fprintf(os.Stderr, "Failed to json serialize event: %v", err) - } - } else { - if eJSONBytes, err := json.Marshal(eData); err == nil { - fmt.Println(string(eJSONBytes)) - } else { - fmt.Fprintf(os.Stderr, "Failed to json serialize event: %v", err) +func (ss *StdoutSink) updateEvents(ctx context.Context) { + for { + select { + case event := <-ss.updateChan: + eData := NewEventData(event.eNew, event.eOld) + if len(ss.namespace) > 0 { + namespacedData := map[string]interface{}{} + namespacedData[ss.namespace] = eData + if eJSONBytes, err := json.Marshal(namespacedData); err == nil { + fmt.Println(string(eJSONBytes)) + } else { + // no point handing err, we have a bigger problem, should we panic? + _, _ = fmt.Fprintf(os.Stderr, "Failed to json serialize event: %v\n", err) + } + } else { + if eJSONBytes, err := json.Marshal(eData); err == nil { + fmt.Println(string(eJSONBytes)) + } else { + // no point handing err, we have a bigger problem, should we panic? + _, _ = fmt.Fprintf(os.Stderr, "Failed to json serialize event: %v\n", err) + } + } + case <-ctx.Done(): + // no point handing err, we have a bigger problem, should we panic? + glog.V(3).Infof("Ending, stdout sink receiver channel, got ctx.Done()\n") + return } } } diff --git a/sinks/stdoutsink_test.go b/sinks/stdoutsink_test.go new file mode 100644 index 000000000..043d4829e --- /dev/null +++ b/sinks/stdoutsink_test.go @@ -0,0 +1,76 @@ +package sinks + +import ( + "bytes" + "context" + "io" + "k8s.io/client-go/kubernetes/fake" + "os" + "testing" + "time" +) + +func TestNewStdoutSink(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + var err error + + old := os.Stdout + r, w, err := os.Pipe() + if err != nil { + t.Errorf("os.Pipe Failed, %s", err) + } + os.Stdout = w + + ss := NewStdoutSink(ctx, "") + + client := fake.NewSimpleClientset() + + // test create + e := createTestEvent(t, client) + ss.UpdateEvents(e, nil) + + // test update + oldE, newE := updateTestEvent(t, client, e) + ss.UpdateEvents(newE, oldE) + + // we are not using it + deleteTestEvent(t, client) + + cancel() + + err = w.Close() + if err != nil { + t.Errorf("Close Failed, %s", err) + } + + var buf bytes.Buffer + var n int64 + n, err = io.Copy(&buf, r) + if err != nil { + t.Errorf("io.Copy Failed (written=%d), %s", n, err) + } + + os.Stdout = old + + expected := `{"verb":"ADDED","event":{"metadata":{"name":"my-test-event","namespace":"test-ns","creationTimestamp":null},"involvedObject":{},"reason":"foobar","source":{},"firstTimestamp":null,"lastTimestamp":null,"eventTime":null,"reportingComponent":"","reportingInstance":""}} +{"verb":"UPDATED","event":{"metadata":{"name":"my-test-event","namespace":"test-ns","creationTimestamp":null},"involvedObject":{},"reason":"foobar","source":{},"firstTimestamp":null,"lastTimestamp":null,"eventTime":null,"reportingComponent":"","reportingInstance":""},"old_event":{"metadata":{"name":"my-test-event","namespace":"test-ns","creationTimestamp":null},"involvedObject":{},"reason":"foobar","source":{},"firstTimestamp":null,"lastTimestamp":null,"eventTime":null,"reportingComponent":"","reportingInstance":""}} +` + // will take a while to flush? + // retry couple of times to make sure it get's flushed + var flag bool + var count = 3 + for count > 0 { + count-- + if expected == buf.String() { + flag = true + break + } + // not sure how to do it right? this whole stdout testing! + time.Sleep(10 * time.Millisecond) + } + + if !flag { + t.Errorf("Expected=%s\nGot=%s", expected, buf.String()) + } + +}